You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/03/10 13:59:16 UTC

svn commit: r1299191 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Sat Mar 10 12:59:16 2012
New Revision: 1299191

URL: http://svn.apache.org/viewvc?rev=1299191&view=rev
Log:
CAMEL-4770: Added asyncStartListener option to JMS. To start jms consumer async. Thanks to Michael Warecki for partly patch.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java
      - copied, changed from r1299156, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Sat Mar 10 12:59:16 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.Session;
@@ -53,6 +54,7 @@ public class JmsComponent extends Defaul
     private ApplicationContext applicationContext;
     private QueueBrowseStrategy queueBrowseStrategy;
     private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy();
+    private ExecutorService asyncStartExecutorService;
 
     public JmsComponent() {
     }
@@ -318,6 +320,10 @@ public class JmsComponent extends Defaul
         getConfiguration().setTestConnectionOnStartup(testConnectionOnStartup);
     }
 
+    public void setAsyncStartListener(boolean asyncStartListener) {
+        getConfiguration().setAsyncStartListener(asyncStartListener);
+    }
+
     public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) {
         getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage);
     }
@@ -393,8 +399,21 @@ public class JmsComponent extends Defaul
     // -------------------------------------------------------------------------
 
     @Override
-    protected void doStop() throws Exception {
-        super.doStop();
+    protected void doShutdown() throws Exception {
+        if (asyncStartExecutorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartExecutorService);
+            asyncStartExecutorService = null;
+        }
+        super.doShutdown();
+    }
+
+    protected synchronized ExecutorService getAsyncStartExecutorService() {
+        if (asyncStartExecutorService == null) {
+            // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
+            // for each task, and the thread pool will shrink when no more tasks running
+            asyncStartExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartListener");
+        }
+        return asyncStartExecutorService;
     }
 
     @Override
@@ -423,7 +442,7 @@ public class JmsComponent extends Defaul
 
         // lets make sure we copy the configuration as each endpoint can
         // customize its own version
-        JmsConfiguration newConfiguration = getConfiguration().copy();        
+        JmsConfiguration newConfiguration = getConfiguration().copy();
         JmsEndpoint endpoint;
         if (pubSubDomain) {
             if (tempDestination) {
@@ -473,12 +492,12 @@ public class JmsComponent extends Defaul
             endpoint.setJmsKeyFormatStrategy(resolveAndRemoveReferenceParameter(
                     parameters, KEY_FORMAT_STRATEGY_PARAM, JmsKeyFormatStrategy.class));
         }
-        
+
         setProperties(endpoint.getConfiguration(), parameters);
         endpoint.setHeaderFilterStrategy(getHeaderFilterStrategy());
 
         return endpoint;
-    }   
+    }
 
     /**
      * A strategy method allowing the URI destination to be translated into the

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sat Mar 10 12:59:16 2012
@@ -122,6 +122,7 @@ public class JmsConfiguration implements
     private boolean transferExchange;
     private boolean transferException;
     private boolean testConnectionOnStartup;
+    private boolean asyncStartListener;
     // if the message is a JmsMessage and mapJmsMessage=false, force the 
     // producer to send the javax.jms.Message body to the next JMS destination    
     private boolean forceSendOriginalMessage;
@@ -1179,6 +1180,14 @@ public class JmsConfiguration implements
         this.transferException = transferException;
     }
 
+    public boolean isAsyncStartListener() {
+        return asyncStartListener;
+    }
+
+    public void setAsyncStartListener(boolean asyncStartListener) {
+        this.asyncStartListener = asyncStartListener;
+    }
+
     public boolean isTestConnectionOnStartup() {
         return testConnectionOnStartup;
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Sat Mar 10 12:59:16 2012
@@ -34,8 +34,8 @@ import org.springframework.jms.support.J
  * @see SimpleJmsMessageListenerContainer
  */
 public class JmsConsumer extends DefaultConsumer implements SuspendableService {
-    private AbstractMessageListenerContainer listenerContainer;
-    private EndpointMessageListener messageListener;
+    private volatile AbstractMessageListenerContainer listenerContainer;
+    private volatile EndpointMessageListener messageListener;
     private volatile boolean initialized;
 
     public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
@@ -81,7 +81,9 @@ public class JmsConsumer extends Default
      * Can be used to start this consumer later if it was configured to not auto startup.
      */
     public void startListenerContainer() {
+        log.trace("Starting listener container {} on destination {}", listenerContainer, getDestinationName());
         listenerContainer.start();
+        log.debug("Started listener container {} on destination {}", listenerContainer, getDestinationName());
     }
 
     /**
@@ -96,7 +98,6 @@ public class JmsConsumer extends Default
             log.debug("Testing JMS Connection on startup for destination: {}", getDestinationName());
             Connection con = listenerContainer.getConnectionFactory().createConnection();
             JmsUtils.closeConnection(con);
-
             log.debug("Successfully tested JMS Connection on startup for destination: {}", getDestinationName());
         } catch (Exception e) {
             String msg = "Cannot get JMS Connection on startup for destination " + getDestinationName();
@@ -112,7 +113,32 @@ public class JmsConsumer extends Default
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
+        
+        if (getEndpoint().getConfiguration().isAsyncStartListener()) {
+            getEndpoint().getAsyncStartExecutorService().submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        prepareAndStartListenerContainer();
+                    } catch (Throwable e) {
+                        log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "AsyncStartListenerTask[" + getDestinationName() + "]";
+                }
+            });
+        } else {
+            prepareAndStartListenerContainer();
+        }
 
+        // mark as initialized for the first time
+        initialized = true;
+    }
+    
+    protected void prepareAndStartListenerContainer() {
         listenerContainer.afterPropertiesSet();
 
         // only start listener if auto start is enabled or we are explicit invoking start later
@@ -123,9 +149,6 @@ public class JmsConsumer extends Default
             }
             startListenerContainer();
         }
-
-        // mark as initialized for the first time
-        initialized = true;
     }
 
     @Override

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sat Mar 10 12:59:16 2012
@@ -125,7 +125,6 @@ public class JmsEndpoint extends Default
         this(UnsafeUriCharactersEncoder.encode(endpointUri), destinationName, true);
     }
 
-
     /**
      * Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component
      */
@@ -307,6 +306,12 @@ public class JmsEndpoint extends Default
 
     // Properties
     // -------------------------------------------------------------------------
+
+    @Override
+    public JmsComponent getComponent() {
+        return (JmsComponent) super.getComponent();
+    }
+
     public HeaderFilterStrategy getHeaderFilterStrategy() {
         if (headerFilterStrategy == null) {
             headerFilterStrategy = new JmsHeaderFilterStrategy();
@@ -463,6 +468,14 @@ public class JmsEndpoint extends Default
         }
         return replyManagerExecutorService;
     }
+    
+    protected ExecutorService getAsyncStartExecutorService() {
+        if (getComponent() == null) {
+            throw new IllegalStateException("AsyncStartListener requires JmsComponent to be configured on this endpoint: " + this);
+        }
+        // use shared thread pool from component
+        return getComponent().getAsyncStartExecutorService();
+    }
 
     /**
      * State whether this endpoint is running (eg started)
@@ -491,6 +504,11 @@ public class JmsEndpoint extends Default
             }
             replyToReplyManager.clear();
         }
+
+        if (replyManagerExecutorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(replyManagerExecutorService);
+            replyManagerExecutorService = null;
+        }
     }
 
     // Delegated properties from the configuration
@@ -1063,6 +1081,16 @@ public class JmsEndpoint extends Default
     }
 
     @ManagedAttribute
+    public void setAsyncStartListener(boolean asyncStartListener) {
+        configuration.setAsyncStartListener(asyncStartListener);
+    }
+
+    @ManagedAttribute
+    public boolean isAsyncStartListener() {
+        return configuration.isAsyncStartListener();
+    }
+
+    @ManagedAttribute
     public String getReplyToType() {
         if (configuration.getReplyToType() != null) {
             return configuration.getReplyToType().name();

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (from r1299156, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java&r1=1299156&r2=1299191&rev=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java Sat Mar 10 12:59:16 2012
@@ -20,43 +20,23 @@ import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
+
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
 
 /**
- * A simple request / reply test
+ * Testing with async start listener
  */
-public class JmsSimpleRequestReplyTest extends CamelTestSupport {
+public class JmsAsyncStartListenerTest extends CamelTestSupport {
 
     protected String componentName = "activemq";
 
     @Test
-    public void testRequetReply() throws Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-
-        Exchange out = template.send("activemq:queue:hello", ExchangePattern.InOut, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.getIn().setBody("Hello World");
-                exchange.getIn().setHeader("foo", 123);
-            }
-        });
-
-        result.assertIsSatisfied();
-
-        assertNotNull(out);
-
-        assertEquals("Bye World", out.getOut().getBody(String.class));
-        assertEquals(123, out.getOut().getHeader("foo"));
-    }
-
-    @Test
-    public void testRequetReply2Messages() throws Exception {
+    public void testAsyncStartListener() throws Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(2);
 
@@ -69,8 +49,12 @@ public class JmsSimpleRequestReplyTest e
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
-        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
-        camelContext.addComponent(componentName, jmsComponentAutoAcknowledge(connectionFactory));
+        // use a persistent queue as the consumer is started asynchronously
+        // so we need a persistent store in case no active consumers when we send the messages
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory();
+        JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
+        jms.setAsyncStartListener(true);
+        camelContext.addComponent(componentName, jms);
 
         return camelContext;
     }
@@ -81,7 +65,6 @@ public class JmsSimpleRequestReplyTest e
                 from("activemq:queue:hello").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         exchange.getIn().setBody("Bye World");
-                        assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
                     }
                 }).to("mock:result");
             }