You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/27 17:02:45 UTC
svn commit: r1331455 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Fri Apr 27 15:02:45 2012
New Revision: 1331455
URL: http://svn.apache.org/viewvc?rev=1331455&view=rev
Log:
CAMEL-4309: Added option asyncStopListener to camel-jms to stop using async thread
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java
- copied, changed from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java
- copied, changed from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Fri Apr 27 15:02:45 2012
@@ -54,7 +54,7 @@ public class JmsComponent extends Defaul
private ApplicationContext applicationContext;
private QueueBrowseStrategy queueBrowseStrategy;
private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy();
- private ExecutorService asyncStartExecutorService;
+ private ExecutorService asyncStartStopExecutorService;
public JmsComponent() {
}
@@ -324,6 +324,10 @@ public class JmsComponent extends Defaul
getConfiguration().setAsyncStartListener(asyncStartListener);
}
+ public void setAsyncStopListener(boolean asyncStopListener) {
+ getConfiguration().setAsyncStopListener(asyncStopListener);
+ }
+
public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) {
getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage);
}
@@ -404,20 +408,20 @@ public class JmsComponent extends Defaul
@Override
protected void doShutdown() throws Exception {
- if (asyncStartExecutorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartExecutorService);
- asyncStartExecutorService = null;
+ if (asyncStartStopExecutorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+ asyncStartStopExecutorService = null;
}
super.doShutdown();
}
- protected synchronized ExecutorService getAsyncStartExecutorService() {
- if (asyncStartExecutorService == null) {
+ protected synchronized ExecutorService getAsyncStartStopExecutorService() {
+ if (asyncStartStopExecutorService == null) {
// use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
// for each task, and the thread pool will shrink when no more tasks running
- asyncStartExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartListener");
+ asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
}
- return asyncStartExecutorService;
+ return asyncStartStopExecutorService;
}
@Override
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Fri Apr 27 15:02:45 2012
@@ -124,7 +124,8 @@ public class JmsConfiguration implements
private boolean transferException;
private boolean testConnectionOnStartup;
private boolean asyncStartListener;
- // if the message is a JmsMessage and mapJmsMessage=false, force the
+ private boolean asyncStopListener;
+ // if the message is a JmsMessage and mapJmsMessage=false, force the
// producer to send the javax.jms.Message body to the next JMS destination
private boolean forceSendOriginalMessage;
// to force disabling time to live (works in both in-only or in-out mode)
@@ -1200,6 +1201,14 @@ public class JmsConfiguration implements
this.asyncStartListener = asyncStartListener;
}
+ public boolean isAsyncStopListener() {
+ return asyncStopListener;
+ }
+
+ public void setAsyncStopListener(boolean asyncStopListener) {
+ this.asyncStopListener = asyncStopListener;
+ }
+
public boolean isTestConnectionOnStartup() {
return testConnectionOnStartup;
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Fri Apr 27 15:02:45 2012
@@ -115,7 +115,7 @@ public class JmsConsumer extends Default
}
if (getEndpoint().getConfiguration().isAsyncStartListener()) {
- getEndpoint().getAsyncStartExecutorService().submit(new Runnable() {
+ getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
@Override
public void run() {
try {
@@ -151,19 +151,42 @@ public class JmsConsumer extends Default
}
}
- @Override
- protected void doStop() throws Exception {
+ protected void stopAndDestroyListenerContainer() {
if (listenerContainer != null) {
listenerContainer.stop();
listenerContainer.destroy();
- // TODO: The async destroy code does not work well see https://issues.apache.org/jira/browse/CAMEL-4309
- // getEndpoint().destroyMessageListenerContainer(listenerContainer);
}
-
// null container and listener so they are fully re created if this consumer is restarted
// then we will use updated configuration from jms endpoint that may have been managed using JMX
listenerContainer = null;
messageListener = null;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (listenerContainer != null) {
+
+ if (getEndpoint().getConfiguration().isAsyncStopListener()) {
+ getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ stopAndDestroyListenerContainer();
+ } catch (Throwable e) {
+ log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncStopListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ prepareAndStartListenerContainer();
+ }
+ }
+
super.doStop();
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Apr 27 15:02:45 2012
@@ -469,12 +469,12 @@ public class JmsEndpoint extends Default
return replyManagerExecutorService;
}
- protected ExecutorService getAsyncStartExecutorService() {
+ protected ExecutorService getAsyncStartStopExecutorService() {
if (getComponent() == null) {
- throw new IllegalStateException("AsyncStartListener requires JmsComponent to be configured on this endpoint: " + this);
+ throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this);
}
// use shared thread pool from component
- return getComponent().getAsyncStartExecutorService();
+ return getComponent().getAsyncStartStopExecutorService();
}
/**
@@ -1096,6 +1096,16 @@ public class JmsEndpoint extends Default
}
@ManagedAttribute
+ public void setAsyncStopListener(boolean asyncStoptListener) {
+ configuration.setAsyncStopListener(asyncStoptListener);
+ }
+
+ @ManagedAttribute
+ public boolean isAsyncStopListener() {
+ return configuration.isAsyncStopListener();
+ }
+
+ @ManagedAttribute
public String getReplyToType() {
if (configuration.getReplyToType() != null) {
return configuration.getReplyToType().name();
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java (from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&r1=1331369&r2=1331455&rev=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java Fri Apr 27 15:02:45 2012
@@ -31,7 +31,7 @@ import static org.apache.camel.component
/**
* Testing with async start listener
*/
-public class JmsAsyncStartListenerTest extends CamelTestSupport {
+public class JmsAsyncStartStopListenerTest extends CamelTestSupport {
protected String componentName = "activemq";
@@ -54,6 +54,7 @@ public class JmsAsyncStartListenerTest e
ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory();
JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
jms.setAsyncStartListener(true);
+ jms.setAsyncStopListener(true);
camelContext.addComponent(componentName, jms);
return camelContext;
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java (from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&r1=1331369&r2=1331455&rev=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java Fri Apr 27 15:02:45 2012
@@ -29,14 +29,14 @@ import org.junit.Test;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
/**
- * Testing with async start listener
+ * Testing with async stop listener
*/
-public class JmsAsyncStartListenerTest extends CamelTestSupport {
+public class JmsAsyncStopListenerTest extends CamelTestSupport {
protected String componentName = "activemq";
@Test
- public void testAsyncStartListener() throws Exception {
+ public void testAsyncStopListener() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(2);
@@ -53,7 +53,7 @@ public class JmsAsyncStartListenerTest e
// so we need a persistent store in case no active consumers when we send the messages
ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory();
JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
- jms.setAsyncStartListener(true);
+ jms.setAsyncStopListener(true);
camelContext.addComponent(componentName, jms);
return camelContext;