You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/03/10 13:59:16 UTC
svn commit: r1299191 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Sat Mar 10 12:59:16 2012
New Revision: 1299191
URL: http://svn.apache.org/viewvc?rev=1299191&view=rev
Log:
CAMEL-4770: Added asyncStartListener option to JMS. To start jms consumer async. Thanks to Michael Warecki for partly patch.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java
- copied, changed from r1299156, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Sat Mar 10 12:59:16 2012
@@ -17,6 +17,7 @@
package org.apache.camel.component.jms;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.Session;
@@ -53,6 +54,7 @@ public class JmsComponent extends Defaul
private ApplicationContext applicationContext;
private QueueBrowseStrategy queueBrowseStrategy;
private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy();
+ private ExecutorService asyncStartExecutorService;
public JmsComponent() {
}
@@ -318,6 +320,10 @@ public class JmsComponent extends Defaul
getConfiguration().setTestConnectionOnStartup(testConnectionOnStartup);
}
+ public void setAsyncStartListener(boolean asyncStartListener) {
+ getConfiguration().setAsyncStartListener(asyncStartListener);
+ }
+
public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) {
getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage);
}
@@ -393,8 +399,21 @@ public class JmsComponent extends Defaul
// -------------------------------------------------------------------------
@Override
- protected void doStop() throws Exception {
- super.doStop();
+ protected void doShutdown() throws Exception {
+ if (asyncStartExecutorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartExecutorService);
+ asyncStartExecutorService = null;
+ }
+ super.doShutdown();
+ }
+
+ protected synchronized ExecutorService getAsyncStartExecutorService() {
+ if (asyncStartExecutorService == null) {
+ // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
+ // for each task, and the thread pool will shrink when no more tasks running
+ asyncStartExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartListener");
+ }
+ return asyncStartExecutorService;
}
@Override
@@ -423,7 +442,7 @@ public class JmsComponent extends Defaul
// lets make sure we copy the configuration as each endpoint can
// customize its own version
- JmsConfiguration newConfiguration = getConfiguration().copy();
+ JmsConfiguration newConfiguration = getConfiguration().copy();
JmsEndpoint endpoint;
if (pubSubDomain) {
if (tempDestination) {
@@ -473,12 +492,12 @@ public class JmsComponent extends Defaul
endpoint.setJmsKeyFormatStrategy(resolveAndRemoveReferenceParameter(
parameters, KEY_FORMAT_STRATEGY_PARAM, JmsKeyFormatStrategy.class));
}
-
+
setProperties(endpoint.getConfiguration(), parameters);
endpoint.setHeaderFilterStrategy(getHeaderFilterStrategy());
return endpoint;
- }
+ }
/**
* A strategy method allowing the URI destination to be translated into the
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sat Mar 10 12:59:16 2012
@@ -122,6 +122,7 @@ public class JmsConfiguration implements
private boolean transferExchange;
private boolean transferException;
private boolean testConnectionOnStartup;
+ private boolean asyncStartListener;
// if the message is a JmsMessage and mapJmsMessage=false, force the
// producer to send the javax.jms.Message body to the next JMS destination
private boolean forceSendOriginalMessage;
@@ -1179,6 +1180,14 @@ public class JmsConfiguration implements
this.transferException = transferException;
}
+ public boolean isAsyncStartListener() {
+ return asyncStartListener;
+ }
+
+ public void setAsyncStartListener(boolean asyncStartListener) {
+ this.asyncStartListener = asyncStartListener;
+ }
+
public boolean isTestConnectionOnStartup() {
return testConnectionOnStartup;
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Sat Mar 10 12:59:16 2012
@@ -34,8 +34,8 @@ import org.springframework.jms.support.J
* @see SimpleJmsMessageListenerContainer
*/
public class JmsConsumer extends DefaultConsumer implements SuspendableService {
- private AbstractMessageListenerContainer listenerContainer;
- private EndpointMessageListener messageListener;
+ private volatile AbstractMessageListenerContainer listenerContainer;
+ private volatile EndpointMessageListener messageListener;
private volatile boolean initialized;
public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
@@ -81,7 +81,9 @@ public class JmsConsumer extends Default
* Can be used to start this consumer later if it was configured to not auto startup.
*/
public void startListenerContainer() {
+ log.trace("Starting listener container {} on destination {}", listenerContainer, getDestinationName());
listenerContainer.start();
+ log.debug("Started listener container {} on destination {}", listenerContainer, getDestinationName());
}
/**
@@ -96,7 +98,6 @@ public class JmsConsumer extends Default
log.debug("Testing JMS Connection on startup for destination: {}", getDestinationName());
Connection con = listenerContainer.getConnectionFactory().createConnection();
JmsUtils.closeConnection(con);
-
log.debug("Successfully tested JMS Connection on startup for destination: {}", getDestinationName());
} catch (Exception e) {
String msg = "Cannot get JMS Connection on startup for destination " + getDestinationName();
@@ -112,7 +113,32 @@ public class JmsConsumer extends Default
if (listenerContainer == null) {
createMessageListenerContainer();
}
+
+ if (getEndpoint().getConfiguration().isAsyncStartListener()) {
+ getEndpoint().getAsyncStartExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ prepareAndStartListenerContainer();
+ } catch (Throwable e) {
+ log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncStartListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ prepareAndStartListenerContainer();
+ }
+ // mark as initialized for the first time
+ initialized = true;
+ }
+
+ protected void prepareAndStartListenerContainer() {
listenerContainer.afterPropertiesSet();
// only start listener if auto start is enabled or we are explicit invoking start later
@@ -123,9 +149,6 @@ public class JmsConsumer extends Default
}
startListenerContainer();
}
-
- // mark as initialized for the first time
- initialized = true;
}
@Override
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1299191&r1=1299190&r2=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sat Mar 10 12:59:16 2012
@@ -125,7 +125,6 @@ public class JmsEndpoint extends Default
this(UnsafeUriCharactersEncoder.encode(endpointUri), destinationName, true);
}
-
/**
* Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component
*/
@@ -307,6 +306,12 @@ public class JmsEndpoint extends Default
// Properties
// -------------------------------------------------------------------------
+
+ @Override
+ public JmsComponent getComponent() {
+ return (JmsComponent) super.getComponent();
+ }
+
public HeaderFilterStrategy getHeaderFilterStrategy() {
if (headerFilterStrategy == null) {
headerFilterStrategy = new JmsHeaderFilterStrategy();
@@ -463,6 +468,14 @@ public class JmsEndpoint extends Default
}
return replyManagerExecutorService;
}
+
+ protected ExecutorService getAsyncStartExecutorService() {
+ if (getComponent() == null) {
+ throw new IllegalStateException("AsyncStartListener requires JmsComponent to be configured on this endpoint: " + this);
+ }
+ // use shared thread pool from component
+ return getComponent().getAsyncStartExecutorService();
+ }
/**
* State whether this endpoint is running (eg started)
@@ -491,6 +504,11 @@ public class JmsEndpoint extends Default
}
replyToReplyManager.clear();
}
+
+ if (replyManagerExecutorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(replyManagerExecutorService);
+ replyManagerExecutorService = null;
+ }
}
// Delegated properties from the configuration
@@ -1063,6 +1081,16 @@ public class JmsEndpoint extends Default
}
@ManagedAttribute
+ public void setAsyncStartListener(boolean asyncStartListener) {
+ configuration.setAsyncStartListener(asyncStartListener);
+ }
+
+ @ManagedAttribute
+ public boolean isAsyncStartListener() {
+ return configuration.isAsyncStartListener();
+ }
+
+ @ManagedAttribute
public String getReplyToType() {
if (configuration.getReplyToType() != null) {
return configuration.getReplyToType().name();
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (from r1299156, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java&r1=1299156&r2=1299191&rev=1299191&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java Sat Mar 10 12:59:16 2012
@@ -20,43 +20,23 @@ import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
+
import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
/**
- * A simple request / reply test
+ * Testing with async start listener
*/
-public class JmsSimpleRequestReplyTest extends CamelTestSupport {
+public class JmsAsyncStartListenerTest extends CamelTestSupport {
protected String componentName = "activemq";
@Test
- public void testRequetReply() throws Exception {
- MockEndpoint result = getMockEndpoint("mock:result");
- result.expectedMessageCount(1);
-
- Exchange out = template.send("activemq:queue:hello", ExchangePattern.InOut, new Processor() {
- public void process(Exchange exchange) throws Exception {
- exchange.getIn().setBody("Hello World");
- exchange.getIn().setHeader("foo", 123);
- }
- });
-
- result.assertIsSatisfied();
-
- assertNotNull(out);
-
- assertEquals("Bye World", out.getOut().getBody(String.class));
- assertEquals(123, out.getOut().getHeader("foo"));
- }
-
- @Test
- public void testRequetReply2Messages() throws Exception {
+ public void testAsyncStartListener() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(2);
@@ -69,8 +49,12 @@ public class JmsSimpleRequestReplyTest e
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
- camelContext.addComponent(componentName, jmsComponentAutoAcknowledge(connectionFactory));
+ // use a persistent queue as the consumer is started asynchronously
+ // so we need a persistent store in case no active consumers when we send the messages
+ ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory();
+ JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
+ jms.setAsyncStartListener(true);
+ camelContext.addComponent(componentName, jms);
return camelContext;
}
@@ -81,7 +65,6 @@ public class JmsSimpleRequestReplyTest e
from("activemq:queue:hello").process(new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Bye World");
- assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
}
}).to("mock:result");
}