You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by su...@apache.org on 2012/09/26 20:51:54 UTC

svn commit: r1390665 - in /camel/trunk/components/camel-sjms/src: main/java/org/apache/camel/component/sjms/consumer/ main/java/org/apache/camel/component/sjms/producer/ test/java/org/apache/camel/component/sjms/it/ test/java/org/apache/camel/component...

Author: sully6768
Date: Wed Sep 26 18:51:53 2012
New Revision: 1390665

URL: http://svn.apache.org/viewvc?rev=1390665&view=rev
Log:
Create ConnectionResource integration test and clean up current ITests.

Added:
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java
Modified:
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
    camel/trunk/components/camel-sjms/src/test/resources/log4j.properties

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java Wed Sep 26 18:51:53 2012
@@ -63,9 +63,9 @@ public class DefaultConsumer extends Sjm
         protected void destroyObject(MessageConsumerResources model) throws Exception {
             if (model != null) {
                 if (model.getMessageConsumer() != null) {
-                    if (model.getMessageConsumer().getMessageListener() != null) {
-                        model.getMessageConsumer().setMessageListener(null);
-                    }
+//                    if (model.getMessageConsumer().getMessageListener() != null) {
+//                        model.getMessageConsumer().setMessageListener(null);
+//                    }
                     model.getMessageConsumer().close();
                 }
 

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Wed Sep 26 18:51:53 2012
@@ -98,8 +98,10 @@ public class InOutProducer extends SjmsP
 
                 @Override
                 public void onMessage(Message message) {
-                    logger.info("Message Received in the Consumer Pool");
-                    logger.info("  Message : {}", message);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Message Received in the Consumer Pool");
+                        logger.debug("  Message : {}", message);
+                    }
                     try {
                         Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
                         exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java Wed Sep 26 18:51:53 2012
@@ -26,7 +26,9 @@ import org.apache.camel.util.StopWatch;
 import org.junit.Test;
 
 /**
- * @version 
+ * Integration test that verifies the ability of SJMS to correctly process
+ * asynchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a namedReplyTo destination.
  */
 public class AsyncJmsInOutIT extends JmsTestSupport {
 
@@ -53,23 +55,13 @@ public class AsyncJmsInOutIT extends Jms
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
-                // (there are delays in both routes)
-                // however due async routing, we can leverage the fact to let threads non blocked
-                // in the first route, and therefore can have the messages processed faster
-                // because we can have messages wait concurrently in both routes
-                // this means the async processing model is about 2x faster
 
                 from("seda:start")
-                    // we can only send at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
-                    .to("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+                    .to("sjms:queue:in.foo?synchronous=false&namedReplyTo=out.bar&exchangePattern=InOut")
                     .to("mock:result");
 
-                from("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+                from("sjms:queue:in.foo?synchronous=false&exchangePattern=InOut")
                     .log("Using ${threadName} to process ${body}")
-                    // we can only process at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
                     .transform(body().prepend("Bye "));
             }
         };

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java Wed Sep 26 18:51:53 2012
@@ -26,7 +26,9 @@ import org.apache.camel.util.StopWatch;
 import org.junit.Test;
 
 /**
- * @version 
+ * Integration test that verifies the ability of SJMS to correctly process
+ * asynchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a temporary destination.
  */
 public class AsyncJmsInOutTempDestIT extends JmsTestSupport {
 
@@ -53,23 +55,13 @@ public class AsyncJmsInOutTempDestIT ext
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
-                // (there are delays in both routes)
-                // however due async routing, we can leverage the fact to let threads non blocked
-                // in the first route, and therefore can have the messages processed faster
-                // because we can have messages wait concurrently in both routes
-                // this means the async processing model is about 2x faster
 
                 from("seda:start")
-                    // we can only send at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
-                    .to("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+                    .to("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut")
                     .to("mock:result");
 
-                from("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+                from("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut")
                     .log("Using ${threadName} to process ${body}")
-                    // we can only process at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
                     .transform(body().prepend("Bye "));
             }
         };

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java?rev=1390665&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java Wed Sep 26 18:51:53 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.StopWatch;
+import org.junit.Test;
+
+/**
+ * Integration test that verifies we can replace the internal
+ * ConnectionFactoryResource with another provider.
+ * 
+ */
+public class ConnectionResourceIT extends JmsTestSupport {
+
+    /**
+     * Test method for
+     * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)}
+     * .
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCreateConnections() throws Exception {
+        ConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
+        assertNotNull(pool);
+        Connection connection = pool.borrowConnection();
+        assertNotNull(connection);
+        assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+        pool.returnConnection(connection);
+        Connection connection2 = pool.borrowConnection();
+        assertNotNull(connection2);
+    }
+
+    @Test
+    public void testAsynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(100);
+        mock.expectsNoDuplicates(body());
+
+        StopWatch watch = new StopWatch();
+
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:start", "" + i);
+        }
+
+        // just in case we run on slow boxes
+        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+        log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = new DefaultCamelContext();
+        AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
+        SjmsComponent component = new SjmsComponent();
+        component.setConnectionResource(pool);
+        camelContext.addComponent("sjms", component);
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from("seda:start")
+                    .to("sjms:queue:in.foo?namedReplyTo=out.bar&exchangePattern=InOut&producerCount=5")
+                    .to("mock:result");
+
+                from("sjms:queue:in.foo?exchangePattern=InOut&consumerCount=20")
+                    .log("Using ${threadName} to process ${body}")
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+
+    public class AMQConnectionResource implements ConnectionResource {
+        private PooledConnectionFactory pcf;
+
+        public AMQConnectionResource(String connectString, int maxConnections) {
+            super();
+            pcf = new PooledConnectionFactory(connectString);
+            pcf.setMaxConnections(maxConnections);
+            pcf.start();
+        }
+
+        public void stop() {
+            pcf.stop();
+        }
+
+        @Override
+        public Connection borrowConnection() throws Exception {
+            Connection answer = pcf.createConnection();
+            answer.start();
+            return answer;
+        }
+
+        @Override
+        public Connection borrowConnection(long timeout) throws Exception {
+            Connection answer = null;
+            int counter = 0;
+            while (counter++ < timeout) {
+                answer = pcf.createConnection();
+                if (answer == null) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        log.info("Intrupted but ignoring: ", e.getMessage());
+                    }
+                } else {
+                    break;
+                }
+            }
+            if (answer != null) {
+                answer.start();
+            }
+            return answer;
+        }
+
+        @Override
+        public void returnConnection(Connection connection) throws Exception {
+            // Do nothing in this case since the PooledConnectionFactory takes
+            // care of this for us
+            log.info("Connection returned");
+        }
+    }
+}

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java Wed Sep 26 18:51:53 2012
@@ -25,8 +25,11 @@ import org.apache.camel.util.StopWatch;
 
 import org.junit.Test;
 
+
 /**
- * @version 
+ * Integration test that verifies the ability of SJMS to correctly process
+ * synchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a namedReplyTo destination.
  */
 public class SyncJmsInOutIT extends JmsTestSupport {
 
@@ -53,23 +56,13 @@ public class SyncJmsInOutIT extends JmsT
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
-                // (there are delays in both routes)
-                // however due async routing, we can leverage the fact to let threads non blocked
-                // in the first route, and therefore can have the messages processed faster
-                // because we can have messages wait concurrently in both routes
-                // this means the async processing model is about 2x faster
 
                 from("seda:start")
-                    // we can only send at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
-                    .to("sjms:queue:bar?exchangePattern=InOut")
+                    .to("sjms:queue:in.foo?namedReplyTo=out.bar&exchangePattern=InOut")
                     .to("mock:result");
 
-                from("sjms:queue:bar?exchangePattern=InOut")
+                from("sjms:queue:in.foo?exchangePattern=InOut")
                     .log("Using ${threadName} to process ${body}")
-                    // we can only process at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
                     .transform(body().prepend("Bye "));
             }
         };

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java Wed Sep 26 18:51:53 2012
@@ -25,8 +25,11 @@ import org.apache.camel.util.StopWatch;
 
 import org.junit.Test;
 
+
 /**
- * @version 
+ * Integration test that verifies the ability of SJMS to correctly process
+ * synchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a temporary destination.
  */
 public class SyncJmsInOutTempDestIT extends JmsTestSupport {
 
@@ -53,23 +56,12 @@ public class SyncJmsInOutTempDestIT exte
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
-                // (there are delays in both routes)
-                // however due async routing, we can leverage the fact to let threads non blocked
-                // in the first route, and therefore can have the messages processed faster
-                // because we can have messages wait concurrently in both routes
-                // this means the async processing model is about 2x faster
-
                 from("seda:start")
-                    // we can only send at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
-                    .to("sjms:in.out.temp.queue?exchangePattern=InOut")
+                    .to("sjms:in.foo.tempQ?exchangePattern=InOut")
                     .to("mock:result");
 
-                from("sjms:in.out.temp.queue?exchangePattern=InOut")
+                from("sjms:in.foo.tempQ?exchangePattern=InOut")
                     .log("Using ${threadName} to process ${body}")
-                    // we can only process at fastest the 100 msg in 5 sec due the delay
-                    .delay(50)
                     .transform(body().prepend("Bye "));
             }
         };

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java Wed Sep 26 18:51:53 2012
@@ -29,8 +29,8 @@ import org.apache.camel.impl.DefaultCame
 import org.apache.camel.test.junit4.CamelTestSupport;
 
 /**
- * TODO Add Class documentation for JmsTestSupport
- *
+ * A support class that builds up and tears down an ActiveMQ instance to be used
+ * for unit testing.
  */
 public class JmsTestSupport extends CamelTestSupport {
     private static final String BROKER_URI = "tcp://localhost:33333";
@@ -40,20 +40,21 @@ public class JmsTestSupport extends Came
     private Connection connection;
     private Session session;
 
+    /** 
+     * Set up the Broker
+     *
+     * @see org.apache.camel.test.junit4.CamelTestSupport#doPreSetup()
+     *
+     * @throws Exception
+     */
     @Override
     protected void doPreSetup() throws Exception {
-        super.doPreSetup();
-    }
-
-    @Override
-    public void setUp() throws Exception {
         broker = new BrokerService();
         broker.setUseJmx(true);
         broker.setPersistent(false);
         broker.deleteAllMessages();
         broker.addConnector(BROKER_URI);
         broker.start();
-        super.setUp();
     }
 
     @Override
@@ -79,10 +80,9 @@ public class JmsTestSupport extends Came
             broker = null;
         }
     }
-    
+
     /*
      * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
-     *
      * @return
      * @throws Exception
      */
@@ -99,7 +99,7 @@ public class JmsTestSupport extends Came
         camelContext.addComponent("sjms", component);
         return camelContext;
     }
-    
+
     public void setSession(Session session) {
         this.session = session;
     }

Modified: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Wed Sep 26 18:51:53 2012
@@ -20,8 +20,8 @@
 log4j.rootLogger=INFO, out
 
 # uncomment the following line to turn on Camel debugging
-log4j.logger.org.apache.activemq=info
-log4j.logger.org.apache.activemq.transaction=trace
+log4j.logger.org.apache.activemq=warn
+log4j.logger.org.apache.activemq.transaction=warn
 log4j.logger.org.apache.camel=info
 log4j.logger.org.apache.camel.converter=info
 log4j.logger.org.apache.camel.component.sjms=info