You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/02 12:56:44 UTC

svn commit: r1368413 [4/5] - in /camel/trunk: components/ components/camel-sjms/ components/camel-sjms/src/ components/camel-sjms/src/main/ components/camel-sjms/src/main/java/ components/camel-sjms/src/main/java/org/ components/camel-sjms/src/main/jav...

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.consumer;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.JmsMessageHeaderType;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransactedInOnlyTopicConsumerTest extends CamelTestSupport {
+    private static final String TEST_DESTINATION_1 = "sjms:topic:transacted.in.only.topic.consumer.test.1?transacted=true";
+    private static final String TEST_DESTINATION_2 = "sjms:topic:transacted.in.only.topic.consumer.test.2?transacted=true";
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+    
+    @Test
+    public void testTransactedInOnlyTopicConsumerExchangeFailure() throws Exception {
+        // We should see the World message twice, once for the exception
+        getMockEndpoint("mock:test1.topic.mock.before").expectedBodiesReceived("World", "World");
+        getMockEndpoint("mock:test1.topic.mock.after").expectedBodiesReceived("Hello World");
+        
+        template.sendBody(TEST_DESTINATION_1, "World");
+        
+        getMockEndpoint("mock:test1.topic.mock.before").assertIsSatisfied();
+        getMockEndpoint("mock:test1.topic.mock.after").assertIsSatisfied();
+
+    }
+
+    @Test
+    public void testTransactedInOnlyTopicConsumerRuntimeException() throws Exception {
+        // We should see the World message twice, once for the exception
+        getMockEndpoint("mock:test2.topic.mock.before").expectedBodiesReceived("World", "World");
+        getMockEndpoint("mock:test2.topic.mock.after").expectedBodiesReceived("Hello World");
+        
+        template.sendBody(TEST_DESTINATION_2, "World");
+        
+        getMockEndpoint("mock:test2.topic.mock.before").assertIsSatisfied();
+        getMockEndpoint("mock:test2.topic.mock.after").assertIsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                "vm://broker?broker.persistent=false&broker.useJmx=true");
+        SjmsComponent component = new SjmsComponent();
+        component.setConnectionFactory(connectionFactory);
+        camelContext.addComponent("sjms", component);
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from(TEST_DESTINATION_1)
+                    .to("log:test1.before")
+                    .to("mock:test1.topic.mock.before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId());
+                            if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) {
+                                logger.info("Exchange does not have a retry message.  Set the exception and allow the retry.");
+                                exchange.setException(new RuntimeCamelException("Creating Failure"));
+                            } else {
+                                logger.info("Exchange has retry header.  Continue processing the message.");
+                            }
+                        }
+                    })
+                    .transform(body().prepend("Hello "))
+                    .to("log:test1.after?showAll=true", "mock:test1.topic.mock.after");
+                
+                from(TEST_DESTINATION_2)
+                    .to("log:test2.before")
+                    .to("mock:test2.topic.mock.before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId());
+                            if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) {
+                                logger.info("Exchange does not have a retry message.  Throw the exception to verify we handle the retry.");
+                                throw new RuntimeCamelException("Creating Failure");
+                            } else {
+                                logger.info("Exchange has retry header.  Continue processing the message.");
+                            }
+                        }
+                    })
+                    .transform(body().prepend("Hello "))
+                    .to("log:test2.after?showAll=true", "mock:test2.topic.mock.after");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class AsyncJmsInOutIT extends JmsTestSupport {
+
+    @Test
+    public void testAsynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(100);
+        mock.expectsNoDuplicates(body());
+
+        StopWatch watch = new StopWatch();
+
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:start", "" + i);
+        }
+
+        // just in case we run on slow boxes
+        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+        log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+                // (there are delays in both routes)
+                // however due async routing, we can leverage the fact to let threads non blocked
+                // in the first route, and therefore can have the messages processed faster
+                // because we can have messages wait concurrently in both routes
+                // this means the async processing model is about 2x faster
+
+                from("seda:start")
+                    // we can only send at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .to("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+                    .to("mock:result");
+
+                from("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+                    .log("Using ${threadName} to process ${body}")
+                    // we can only process at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class AsyncJmsInOutTempDestIT extends JmsTestSupport {
+
+    @Test
+    public void testAsynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(100);
+        mock.expectsNoDuplicates(body());
+
+        StopWatch watch = new StopWatch();
+
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:start", "" + i);
+        }
+
+        // just in case we run on slow boxes
+        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+        log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+                // (there are delays in both routes)
+                // however due async routing, we can leverage the fact to let threads non blocked
+                // in the first route, and therefore can have the messages processed faster
+                // because we can have messages wait concurrently in both routes
+                // this means the async processing model is about 2x faster
+
+                from("seda:start")
+                    // we can only send at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .to("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+                    .to("mock:result");
+
+                from("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+                    .log("Using ${threadName} to process ${body}")
+                    // we can only process at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class SyncJmsInOutIT extends JmsTestSupport {
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(100);
+        mock.expectsNoDuplicates(body());
+
+        StopWatch watch = new StopWatch();
+
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:start", "" + i);
+        }
+
+        // just in case we run on slow boxes
+        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+        log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+                // (there are delays in both routes)
+                // however due async routing, we can leverage the fact to let threads non blocked
+                // in the first route, and therefore can have the messages processed faster
+                // because we can have messages wait concurrently in both routes
+                // this means the async processing model is about 2x faster
+
+                from("seda:start")
+                    // we can only send at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .to("sjms:queue:bar?exchangePattern=InOut")
+                    .to("mock:result");
+
+                from("sjms:queue:bar?exchangePattern=InOut")
+                    .log("Using ${threadName} to process ${body}")
+                    // we can only process at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class SyncJmsInOutTempDestIT extends JmsTestSupport {
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(100);
+        mock.expectsNoDuplicates(body());
+
+        StopWatch watch = new StopWatch();
+
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:start", "" + i);
+        }
+
+        // just in case we run on slow boxes
+        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+        log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+                // (there are delays in both routes)
+                // however due async routing, we can leverage the fact to let threads non blocked
+                // in the first route, and therefore can have the messages processed faster
+                // because we can have messages wait concurrently in both routes
+                // this means the async processing model is about 2x faster
+
+                from("seda:start")
+                    // we can only send at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .to("sjms:in.out.temp.queue?exchangePattern=InOut")
+                    .to("mock:result");
+
+                from("sjms:in.out.temp.queue?exchangePattern=InOut")
+                    .log("Using ${threadName} to process ${body}")
+                    // we can only process at fastest the 100 msg in 5 sec due the delay
+                    .delay(50)
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO Add Class documentation for DefaultConnectionResourceTest
+ * 
+ */
+public class DefaultConnectionResourceTest {
+    private ActiveMQConnectionFactory connectionFactory;
+
+    @Before
+    public void setup() {
+        connectionFactory = new ActiveMQConnectionFactory(
+                "vm://broker?broker.persistent=false");
+    }
+
+    @After
+    public void teardown() {
+        connectionFactory = null;
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.DefaultConnectionResourceTest#createObject()}
+     * .
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCreateObject() throws Exception {
+        DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+        pool.fillPool();
+        assertNotNull(pool);
+        ActiveMQConnection connection = (ActiveMQConnection) pool
+                .borrowObject();
+        assertNotNull(connection);
+        assertTrue(connection.isStarted());
+        pool.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.DefaultConnectionResourceTest#createObject()}
+     * .
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testDestroyObject() throws Exception {
+        DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+        pool.fillPool();
+        assertNotNull(pool);
+        ActiveMQConnection connection = (ActiveMQConnection) pool
+                .borrowObject();
+        assertNotNull(connection);
+        assertTrue(connection.isStarted());
+        pool.drainPool();
+        assertTrue(pool.size() == 0);
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#borrowObject()}.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testBorrowObject() throws Exception {
+        DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+        pool.fillPool();
+        assertNotNull(pool);
+        ActiveMQConnection connection = (ActiveMQConnection) pool.borrowObject();
+        assertNotNull(connection);
+        assertTrue(connection.isStarted());
+
+        ActiveMQConnection connection2 = (ActiveMQConnection) pool.borrowObject();
+        assertNull(connection2);
+        pool.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#returnObject(java.lang.Object)}
+     * .
+     * @throws Exception 
+     */
+    @Test
+    public void testReturnObject() throws Exception {
+        DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+        pool.fillPool();
+        assertNotNull(pool);
+        ActiveMQConnection connection = (ActiveMQConnection) pool.borrowObject();
+        assertNotNull(connection);
+        assertTrue(connection.isStarted());
+        pool.returnObject(connection);
+        ActiveMQConnection connection2 = (ActiveMQConnection) pool.borrowObject();
+        assertNotNull(connection2);
+        pool.drainPool();
+    }
+
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO Add Class documentation for ObjectPoolTest
+ * 
+ */
+public class ObjectPoolTest {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(ObjectPoolTest.class);
+    
+
+    private AtomicInteger atomicInteger;
+    
+    @Before
+    public void setUp() {
+        atomicInteger = new AtomicInteger();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#ObjectPool()}.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testObjectPool() throws Exception {
+        TestPool testPool = new TestPool();
+        assertNotNull(testPool);
+        testPool.fillPool();
+        MyPooledObject pooledObject = testPool.borrowObject();
+        assertNotNull(pooledObject);
+        assertTrue("Expected a value of 1.  Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() == 1);
+        
+        MyPooledObject nextPooledObject = testPool.borrowObject();
+        assertNull(nextPooledObject);
+        
+        testPool.returnObject(pooledObject);
+        nextPooledObject = testPool.borrowObject();
+        assertNotNull(nextPooledObject);
+        testPool.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#ObjectPool()}.
+     */
+    @Test
+    public void testBadObjectPool() {
+        try {
+            new BadTestPool();
+        } catch (Exception e) {
+            assertTrue("Should have thrown an IllegalStateException", e instanceof IllegalStateException);
+        }
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#ObjectPool(int)}.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testObjectPoolInt() throws Exception {
+        final int maxPoolObjects = 5;
+        
+        TestPool testPool = new TestPool(maxPoolObjects);
+        testPool.fillPool();
+        
+        List<MyPooledObject> poolObjects = new ArrayList<MyPooledObject>();
+        for (int i = 0; i < maxPoolObjects; i++) {
+            poolObjects.add(testPool.borrowObject());
+        }
+        for (int i = 0; i < maxPoolObjects; i++) {
+            MyPooledObject pooledObject = poolObjects.get(i);
+            assertNotNull("MyPooledObject was null for borrow attempt: " + i, pooledObject);
+            assertTrue("Expected a value in the range of 1-5.  Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() > 0 && pooledObject.getObjectId() < 6);
+            LOGGER.info("MyPooledObject has an ID of: " + pooledObject.getObjectId());
+        }
+        
+        assertNull("Pool should be empty", testPool.borrowObject());
+        
+        for (MyPooledObject myPooledObject : poolObjects) {
+            testPool.returnObject(myPooledObject);
+        }
+        
+        MyPooledObject pooledObject = testPool.borrowObject();
+        assertNotNull(pooledObject);
+        assertTrue("Expected a value in the range of 1-5.  Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() > 0 && pooledObject.getObjectId() < 6);
+        
+        testPool.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#createObject()}.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCreateObject() throws Exception {
+        TestPool testPool = new TestPool();
+        assertNotNull(testPool.createObject());
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#borrowObject()}.
+     * 
+     * @throws Exception 
+     */
+    @Test
+    public void testBorrowObject() throws Exception {
+        TestPool testPool = new TestPool();
+        testPool.fillPool();
+        MyPooledObject pooledObject = testPool.borrowObject();
+        assertNotNull(pooledObject);
+        assertTrue("Expected a value of 1.  Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() == 1);
+
+        MyPooledObject nextPooledObject = testPool.borrowObject();
+        assertNull("Expected a null as the pool of 1 was already removed", nextPooledObject);
+        testPool.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#returnObject(java.lang.Object)}
+     * .
+     * @throws Exception 
+     */
+    @Test
+    public void testReturnObject() throws Exception {
+        TestPool testPool = new TestPool();
+        testPool.fillPool();
+        assertNotNull(testPool);
+        MyPooledObject pooledObject = testPool.borrowObject();
+        MyPooledObject nextPooledObject = testPool.borrowObject();
+        testPool.returnObject(pooledObject);
+        nextPooledObject = testPool.borrowObject();
+        assertNotNull(nextPooledObject);
+        testPool.drainPool();
+    }
+
+    class TestPool extends ObjectPool<MyPooledObject> {
+        
+        public TestPool() {
+            super();
+        }
+
+        public TestPool(int poolSize) {
+            super(poolSize);
+        }
+
+        @Override
+        protected MyPooledObject createObject() throws Exception {
+            return new MyPooledObject(atomicInteger.incrementAndGet());
+        }
+
+        @Override
+        protected void destroyObject(MyPooledObject t) throws Exception {
+            t = null;
+        }
+
+    }
+    
+    class MyPooledObject {
+        private int objectId = -1;
+
+        public MyPooledObject(int objectId) {
+            this.objectId = objectId;
+        }
+
+        /**
+         * 
+         * @return the OBJECT_ID
+         */
+        public Integer getObjectId() {
+            return this.objectId;
+        }
+    }
+    
+
+    class BadTestPool extends ObjectPool<Object> {
+        @Override
+        protected Object createObject() throws Exception {
+            throw new Exception();
+        }
+
+        @Override
+        protected void destroyObject(Object t) throws Exception {
+            // TODO Auto-generated method stub
+
+        }
+        
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO Add Class documentation for DefaultConnectionResourceTest
+ * 
+ */
+public class SessionPoolTest {
+    private ActiveMQConnectionFactory connectionFactory;
+
+    @Before
+    public void setup() {
+        connectionFactory = new ActiveMQConnectionFactory(
+                "vm://broker?broker.persistent=false");
+    }
+
+    @After
+    public void teardown() {
+        connectionFactory = null;
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.SessionPoolTest#createObject()}
+     * .
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCreateObject() throws Exception {
+        DefaultConnectionResource connections = new DefaultConnectionResource(1, connectionFactory);
+        connections.fillPool();
+        SessionPool sessions = new SessionPool(1, connections);
+        sessions.fillPool();
+        assertNotNull(sessions);
+        Session session = sessions.createObject();
+        assertNotNull(session);
+        sessions.drainPool();
+        connections.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#borrowObject()}.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testBorrowObject() throws Exception {
+        DefaultConnectionResource connections = new DefaultConnectionResource(1, connectionFactory);
+        connections.fillPool();
+        SessionPool sessions = new SessionPool(1, connections);
+        sessions.fillPool();
+        assertNotNull(sessions);
+        ActiveMQSession session = (ActiveMQSession) sessions.borrowObject();
+        assertNotNull(session);
+        assertTrue(!session.isClosed());
+
+        ActiveMQSession session2 = (ActiveMQSession) sessions.borrowObject();
+        assertNull(session2);
+        sessions.drainPool();
+        connections.drainPool();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.pool.ObjectPool#returnObject(java.lang.Object)}
+     * .
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testReturnObject() throws Exception {
+        DefaultConnectionResource connections = new DefaultConnectionResource(1, connectionFactory);
+        connections.fillPool();
+        SessionPool sessions = new SessionPool(1, connections);
+        sessions.fillPool();
+        assertNotNull(sessions);
+        ActiveMQSession session = (ActiveMQSession) sessions.borrowObject();
+        assertNotNull(session);
+        assertTrue(!session.isClosed());
+
+        ActiveMQSession session2 = (ActiveMQSession) sessions.borrowObject();
+        assertNull(session2);
+
+        sessions.returnObject(session);
+        session2 = (ActiveMQSession) sessions.borrowObject();
+        assertNotNull(session2);
+
+        sessions.drainPool();
+        connections.drainPool();
+    }
+
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class AsyncQueueProducerTest extends CamelTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private static String sedaThreadName;
+    private static String route = "";
+
+    @Test
+    public void testAsyncJmsProducerEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        template.sendBody("direct:start", "Hello Camel");
+        // we should run before the async processor that sets B
+        route += "A";
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(sedaThreadName));
+        assertFalse("Should use different threads", afterThreadName.equalsIgnoreCase(sedaThreadName));
+
+        assertEquals("AB", route);
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                "vm://broker?broker.persistent=false&broker.useJmx=false");
+        SjmsComponent component = new SjmsComponent();
+        component.setConnectionFactory(connectionFactory);
+        camelContext.addComponent("sjms", component);
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start")
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                beforeThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:bye:camel")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                afterThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("sjms:queue:foo?synchronous=false");
+
+                from("sjms:queue:foo?synchronous=false")
+                        .to("mock:after")
+                        .to("log:after")
+                        .delay(1000)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                route += "B";
+                                sedaThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class AsyncTopicProducerTest extends CamelTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private static String sedaThreadName;
+    private static String route = "";
+
+    @Test
+    public void testAsyncTopicProducer() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        template.sendBody("direct:start", "Hello Camel");
+        // we should run before the async processor that sets B
+        route += "A";
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(sedaThreadName));
+        assertFalse("Should use different threads", afterThreadName.equalsIgnoreCase(sedaThreadName));
+
+        assertEquals("AB", route);
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                "vm://broker?broker.persistent=false&broker.useJmx=false");
+        SjmsComponent component = new SjmsComponent();
+        component.setConnectionFactory(connectionFactory);
+        camelContext.addComponent("sjms", component);
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start")
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                beforeThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:bye:camel")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                afterThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("sjms:topic:foo?synchronous=false");
+
+                from("sjms:topic:foo")
+                        .to("mock:after")
+                        .to("log:after")
+                        .delay(1000)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                route += "B";
+                                sedaThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOnlyQueueProducerTest extends JmsTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "sync.queue.producer.test";
+    
+    public InOnlyQueueProducerTest() {
+    }
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Test
+    public void testInOnlyQueueProducer() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME);
+        assertNotNull(mc);
+        final String expectedBody = "Hello World!";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("direct:start", expectedBody);
+        Message message = mc.receive(5000);
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        
+        TextMessage tm = (TextMessage) message;
+        String text = tm.getText();
+        assertNotNull(text);
+        
+        template.sendBody("direct:finish", text);
+        
+        mock.assertIsSatisfied();
+        mc.close();
+
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     *
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("sjms:queue:" + TEST_DESTINATION_NAME);
+                
+                from("direct:finish")
+                    .to("log:test.log.1?showBody=true", "mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOnlyTopicProducerTest extends JmsTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "test.foo.topic";
+    
+    public InOnlyTopicProducerTest() {
+    }
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Test
+    public void testInOnlyTopicProducerProducer() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createTopicConsumer(getSession(), TEST_DESTINATION_NAME, null);
+        assertNotNull(mc);
+        final String expectedBody = "Hello World!";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("direct:start", expectedBody);
+        Message message = mc.receive(5000);
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        
+        TextMessage tm = (TextMessage) message;
+        String text = tm.getText();
+        assertNotNull(text);
+        
+        template.sendBody("direct:finish", text);
+        
+        mock.assertIsSatisfied();
+        mc.close();
+
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     *
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("sjms:topic:" + TEST_DESTINATION_NAME);
+                
+                from("direct:finish")
+                    .to("log:test.log.1?showBody=true", "mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutQueueProducerAsyncLoadTest extends JmsTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+    private MessageConsumer mc1;
+    private MessageConsumer mc2;
+
+    public InOutQueueProducerAsyncLoadTest() {
+    }
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+    
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        mc1 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+        mc2 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+        mc1.setMessageListener(new MyMessageListener());
+        mc2.setMessageListener(new MyMessageListener());
+    }
+    
+    @Override
+    public void tearDown() throws Exception {
+        MyMessageListener l1 = (MyMessageListener)mc1.getMessageListener();
+        l1.close();
+        mc1.close();
+        MyMessageListener l2 = (MyMessageListener)mc2.getMessageListener();
+        l2.close();
+        mc2.close();
+        super.tearDown();
+    }
+
+    /**
+     * Test to verify that when using the consumer listener for the InOut
+     * producer we get the correct message back.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testInOutQueueProducer() throws Exception {
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+
+        for (int i = 1; i <= 5000; i++) {
+            final int tempI = i;
+            Runnable worker = new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        final String requestText = "Message " + tempI;
+                        final String responseText = "Response Message " + tempI;
+                        String response = template.requestBody("direct:start", requestText, String.class);
+                        assertNotNull(response);
+                        assertEquals(responseText, response);
+                    } catch (Exception e) {
+                        log.error("TODO Auto-generated catch block", e);
+                    }
+                }
+            };
+            executor.execute(worker);
+        }
+        while (context.getInflightRepository().size() > 0) {
+
+        }
+        executor.shutdown();
+        while (!executor.isTerminated()) {
+            //
+        }
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     * 
+     * @return
+     * 
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("log:" + TEST_DESTINATION_NAME + ".in.log?showBody=true")
+                    .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+                               + TEST_DESTINATION_NAME
+                               + ".response&consumerCount=10&producerCount=20&synchronous=false").threads(20)
+                    .to("log:" + TEST_DESTINATION_NAME + ".out.log?showBody=true");
+            }
+        };
+    }
+
+    protected class MyMessageListener implements MessageListener {
+        private MessageProducer mp;
+
+        public MyMessageListener() {
+            super();
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                TextMessage request = (TextMessage)message;
+                String text = request.getText();
+
+                TextMessage response = getSession().createTextMessage();
+                response.setText("Response " + text);
+                response.setJMSCorrelationID(request.getJMSCorrelationID());
+                if (mp == null) {
+                    mp = getSession().createProducer(message.getJMSReplyTo());
+                }
+                mp.send(response);
+            } catch (JMSException e) {
+                fail(e.getLocalizedMessage());
+            }
+        }
+
+        public void close() throws JMSException {
+            mp.close();
+        }
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutQueueProducerSyncLoadTest extends JmsTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+    private MessageConsumer mc1;
+    private MessageConsumer mc2;
+
+    public InOutQueueProducerSyncLoadTest() {
+    }
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+    
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        mc1 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+        mc2 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+        mc1.setMessageListener(new MyMessageListener());
+        mc2.setMessageListener(new MyMessageListener());
+    }
+    
+    @Override
+    public void tearDown() throws Exception {
+        MyMessageListener l1 = (MyMessageListener)mc1.getMessageListener();
+        l1.close();
+        mc1.close();
+        MyMessageListener l2 = (MyMessageListener)mc2.getMessageListener();
+        l2.close();
+        mc2.close();
+        super.tearDown();
+    }
+
+    /**
+     * Test to verify that when using the consumer listener for the InOut
+     * producer we get the correct message back.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testInOutQueueProducer() throws Exception {
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+
+        for (int i = 1; i <= 5000; i++) {
+            final int tempI = i;
+            Runnable worker = new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        final String requestText = "Message " + tempI;
+                        final String responseText = "Response Message " + tempI;
+                        String response = template.requestBody("direct:start", requestText, String.class);
+                        assertNotNull(response);
+                        assertEquals(responseText, response);
+                    } catch (Exception e) {
+                        log.error("TODO Auto-generated catch block", e);
+                    }
+                }
+            };
+            executor.execute(worker);
+        }
+        while (context.getInflightRepository().size() > 0) {
+
+        }
+        executor.shutdown();
+        while (!executor.isTerminated()) {
+            //
+        }
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     * 
+     * @return
+     * 
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("log:" + TEST_DESTINATION_NAME + ".in.log?showBody=true")
+                    .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+                               + TEST_DESTINATION_NAME
+                               + ".response&consumerCount=20&producerCount=40&synchronous=true")
+                    .to("log:" + TEST_DESTINATION_NAME + ".out.log?showBody=true");
+            }
+        };
+    }
+
+    protected class MyMessageListener implements MessageListener {
+        private MessageProducer mp;
+
+        public MyMessageListener() {
+            super();
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                TextMessage request = (TextMessage)message;
+                String text = request.getText();
+
+                TextMessage response = getSession().createTextMessage();
+                response.setText("Response " + text);
+                response.setJMSCorrelationID(request.getJMSCorrelationID());
+                if (mp == null) {
+                    mp = getSession().createProducer(message.getJMSReplyTo());
+                }
+                mp.send(response);
+            } catch (JMSException e) {
+                fail(e.getLocalizedMessage());
+            }
+        }
+
+        public void close() throws JMSException {
+            mp.close();
+        }
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutQueueProducerTest extends JmsTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+    
+    public InOutQueueProducerTest() {
+    }
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Test
+    public void testInOutQueueProducer() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME
+                                                                                + ".request");
+        assertNotNull(mc);
+        final String requestText = "Hello World!";
+        final String responseText = "How are you";
+        mc.setMessageListener(new MyMessageListener(requestText, responseText));
+        Object responseObject = template.requestBody("direct:start", requestText);
+        assertNotNull(responseObject);
+        assertTrue(responseObject instanceof String);
+        assertEquals(responseText, responseObject);
+        mc.close();
+
+    }
+
+    @Test
+    public void testInOutQueueProducerWithCorrelationId() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME
+                                                                                + ".request");
+        assertNotNull(mc);
+        final String requestText = "Hello World!";
+        final String responseText = "How are you";
+        mc.setMessageListener(new MyMessageListener(requestText, responseText));
+        final String correlationId = UUID.randomUUID().toString().replace("-", "");
+        Exchange exchange = template.request("direct:start", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getOut().setBody(requestText);
+                exchange.getOut().setHeader("JMSCorrelationID", correlationId);
+            }
+        });
+        assertNotNull(exchange);
+        assertTrue(exchange.getIn().getBody() instanceof String);
+        assertEquals(responseText, exchange.getIn().getBody());
+        assertEquals(correlationId, exchange.getIn().getHeader("JMSCorrelationID", String.class));
+        mc.close();
+
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     * 
+     * @return
+     * 
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("log:" + TEST_DESTINATION_NAME + ".in.log.1?showBody=true")
+                    .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+                               + TEST_DESTINATION_NAME + ".response")
+                    .to("log:" + TEST_DESTINATION_NAME + ".out.log.1?showBody=true");
+            }
+        };
+    }
+
+    protected class MyMessageListener implements MessageListener {
+        private String requestText;
+        private String responseText;
+
+        /**
+         * TODO Add Constructor Javadoc
+         * 
+         * @param request
+         * @param response
+         */
+        public MyMessageListener(String request, String response) {
+            super();
+            this.requestText = request;
+            this.responseText = response;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                TextMessage request = (TextMessage)message;
+                assertNotNull(request);
+                String text = request.getText();
+                assertEquals(requestText, text);
+
+                TextMessage response = getSession().createTextMessage();
+                response.setText(responseText);
+                response.setJMSCorrelationID(request.getJMSCorrelationID());
+                MessageProducer mp = getSession().createProducer(message.getJMSReplyTo());
+                mp.send(response);
+                mp.close();
+            } catch (JMSException e) {
+                fail(e.getLocalizedMessage());
+            }
+        }
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutTempQueueProducerTest extends JmsTestSupport {
+    
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    @Test
+    public void testInOutQueueProducer() throws Exception {
+        String queueName = "in.out.queue.producer.test.request";
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), queueName);
+        assertNotNull(mc);
+        final String requestText = "Hello World!";
+        final String responseText = "How are you";
+        mc.setMessageListener(new MyMessageListener(requestText, responseText));
+        Object responseObject = template.requestBody("sjms:queue:" + queueName + "?exchangePattern=InOut", requestText);
+        assertNotNull(responseObject);
+        assertTrue(responseObject instanceof String);
+        assertEquals(responseText, responseObject);
+        mc.close();
+
+    }
+
+    @Test
+    public void testInOutQueueProducerWithCorrelationId() throws Exception {
+        String queueName = "in.out.queue.producer.test.request";
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), queueName);
+        assertNotNull(mc);
+        final String requestText = "Hello World!";
+        final String responseText = "How are you";
+        mc.setMessageListener(new MyMessageListener(requestText, responseText));
+        final String correlationId = UUID.randomUUID().toString().replace("-", "");
+        Exchange exchange = template.request("sjms:queue:" + queueName + "?exchangePattern=InOut", new Processor() {
+            
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody(requestText);
+                exchange.getIn().setHeader("JMSCorrelationID", correlationId);
+            }
+        });
+        assertNotNull(exchange);
+        assertTrue(exchange.getIn().getBody() instanceof String);
+        assertEquals(responseText, exchange.getOut().getBody());
+        assertEquals(correlationId, exchange.getOut().getHeader("JMSCorrelationID", String.class));
+        mc.close();
+
+    }
+    
+    protected class MyMessageListener implements MessageListener {
+        private String requestText;
+        private String responseText;
+        
+        /**
+         * TODO Add Constructor Javadoc
+         *
+         * @param request
+         * @param response
+         */
+        public MyMessageListener(String request, String response) {
+            super();
+            this.requestText = request;
+            this.responseText = response;
+        }
+        
+        @Override
+        public void onMessage(Message message) {
+            try {
+                TextMessage request = (TextMessage) message;
+                assertNotNull(request);
+                String text = request.getText();
+                assertEquals(requestText, text);
+                
+                TextMessage response = getSession().createTextMessage();
+                response.setText(responseText);
+                response.setJMSCorrelationID(request.getJMSCorrelationID());
+                MessageProducer mp = getSession().createProducer(message.getJMSReplyTo());
+                mp.send(response);
+                mp.close();
+            } catch (JMSException e) {
+                fail(e.getLocalizedMessage());
+            }
+        }
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class QueueProducerTest extends JmsTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "test.foo";
+    
+    public QueueProducerTest() {
+    }
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Test
+    public void testQueueProducer() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME);
+        assertNotNull(mc);
+        final String expectedBody = "Hello World!";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("direct:start", expectedBody);
+        Message message = mc.receive(5000);
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        
+        TextMessage tm = (TextMessage) message;
+        String text = tm.getText();
+        assertNotNull(text);
+        
+        template.sendBody("direct:finish", text);
+        
+        mock.assertIsSatisfied();
+        mc.close();
+
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     *
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("sjms:queue:" + TEST_DESTINATION_NAME);
+                
+                from("direct:finish")
+                    .to("log:test.log.1?showBody=true", "mock:result");
+            }
+        };
+    }
+}