You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/27 17:02:45 UTC

svn commit: r1331455 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Fri Apr 27 15:02:45 2012
New Revision: 1331455

URL: http://svn.apache.org/viewvc?rev=1331455&view=rev
Log:
CAMEL-4309: Added option asyncStopListener to camel-jms to stop using async thread

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java
      - copied, changed from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java
      - copied, changed from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Fri Apr 27 15:02:45 2012
@@ -54,7 +54,7 @@ public class JmsComponent extends Defaul
     private ApplicationContext applicationContext;
     private QueueBrowseStrategy queueBrowseStrategy;
     private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy();
-    private ExecutorService asyncStartExecutorService;
+    private ExecutorService asyncStartStopExecutorService;
 
     public JmsComponent() {
     }
@@ -324,6 +324,10 @@ public class JmsComponent extends Defaul
         getConfiguration().setAsyncStartListener(asyncStartListener);
     }
 
+    public void setAsyncStopListener(boolean asyncStopListener) {
+        getConfiguration().setAsyncStopListener(asyncStopListener);
+    }
+
     public void setForceSendOriginalMessage(boolean forceSendOriginalMessage) {
         getConfiguration().setForceSendOriginalMessage(forceSendOriginalMessage);
     }
@@ -404,20 +408,20 @@ public class JmsComponent extends Defaul
 
     @Override
     protected void doShutdown() throws Exception {
-        if (asyncStartExecutorService != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartExecutorService);
-            asyncStartExecutorService = null;
+        if (asyncStartStopExecutorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+            asyncStartStopExecutorService = null;
         }
         super.doShutdown();
     }
 
-    protected synchronized ExecutorService getAsyncStartExecutorService() {
-        if (asyncStartExecutorService == null) {
+    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
+        if (asyncStartStopExecutorService == null) {
             // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
             // for each task, and the thread pool will shrink when no more tasks running
-            asyncStartExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartListener");
+            asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
         }
-        return asyncStartExecutorService;
+        return asyncStartStopExecutorService;
     }
 
     @Override

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Fri Apr 27 15:02:45 2012
@@ -124,7 +124,8 @@ public class JmsConfiguration implements
     private boolean transferException;
     private boolean testConnectionOnStartup;
     private boolean asyncStartListener;
-    // if the message is a JmsMessage and mapJmsMessage=false, force the 
+    private boolean asyncStopListener;
+    // if the message is a JmsMessage and mapJmsMessage=false, force the
     // producer to send the javax.jms.Message body to the next JMS destination    
     private boolean forceSendOriginalMessage;
     // to force disabling time to live (works in both in-only or in-out mode)
@@ -1200,6 +1201,14 @@ public class JmsConfiguration implements
         this.asyncStartListener = asyncStartListener;
     }
 
+    public boolean isAsyncStopListener() {
+        return asyncStopListener;
+    }
+
+    public void setAsyncStopListener(boolean asyncStopListener) {
+        this.asyncStopListener = asyncStopListener;
+    }
+
     public boolean isTestConnectionOnStartup() {
         return testConnectionOnStartup;
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Fri Apr 27 15:02:45 2012
@@ -115,7 +115,7 @@ public class JmsConsumer extends Default
         }
         
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
-            getEndpoint().getAsyncStartExecutorService().submit(new Runnable() {
+            getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
@@ -151,19 +151,42 @@ public class JmsConsumer extends Default
         }
     }
 
-    @Override
-    protected void doStop() throws Exception {
+    protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
             listenerContainer.stop();
             listenerContainer.destroy();
-            // TODO: The async destroy code does not work well see https://issues.apache.org/jira/browse/CAMEL-4309
-            // getEndpoint().destroyMessageListenerContainer(listenerContainer);
         }
-
         // null container and listener so they are fully re created if this consumer is restarted
         // then we will use updated configuration from jms endpoint that may have been managed using JMX
         listenerContainer = null;
         messageListener = null;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (listenerContainer != null) {
+
+            if (getEndpoint().getConfiguration().isAsyncStopListener()) {
+                getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            stopAndDestroyListenerContainer();
+                        } catch (Throwable e) {
+                            log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "AsyncStopListenerTask[" + getDestinationName() + "]";
+                    }
+                });
+            } else {
+                prepareAndStartListenerContainer();
+            }
+        }
+
         super.doStop();
     }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1331455&r1=1331454&r2=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Apr 27 15:02:45 2012
@@ -469,12 +469,12 @@ public class JmsEndpoint extends Default
         return replyManagerExecutorService;
     }
     
-    protected ExecutorService getAsyncStartExecutorService() {
+    protected ExecutorService getAsyncStartStopExecutorService() {
         if (getComponent() == null) {
-            throw new IllegalStateException("AsyncStartListener requires JmsComponent to be configured on this endpoint: " + this);
+            throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this);
         }
         // use shared thread pool from component
-        return getComponent().getAsyncStartExecutorService();
+        return getComponent().getAsyncStartStopExecutorService();
     }
 
     /**
@@ -1096,6 +1096,16 @@ public class JmsEndpoint extends Default
     }
 
     @ManagedAttribute
+    public void setAsyncStopListener(boolean asyncStoptListener) {
+        configuration.setAsyncStopListener(asyncStoptListener);
+    }
+
+    @ManagedAttribute
+    public boolean isAsyncStopListener() {
+        return configuration.isAsyncStopListener();
+    }
+
+    @ManagedAttribute
     public String getReplyToType() {
         if (configuration.getReplyToType() != null) {
             return configuration.getReplyToType().name();

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java (from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&r1=1331369&r2=1331455&rev=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartStopListenerTest.java Fri Apr 27 15:02:45 2012
@@ -31,7 +31,7 @@ import static org.apache.camel.component
 /**
  * Testing with async start listener
  */
-public class JmsAsyncStartListenerTest extends CamelTestSupport {
+public class JmsAsyncStartStopListenerTest extends CamelTestSupport {
 
     protected String componentName = "activemq";
 
@@ -54,6 +54,7 @@ public class JmsAsyncStartListenerTest e
         ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory();
         JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
         jms.setAsyncStartListener(true);
+        jms.setAsyncStopListener(true);
         camelContext.addComponent(componentName, jms);
 
         return camelContext;

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java (from r1331369, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java&r1=1331369&r2=1331455&rev=1331455&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStartListenerTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAsyncStopListenerTest.java Fri Apr 27 15:02:45 2012
@@ -29,14 +29,14 @@ import org.junit.Test;
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
 
 /**
- * Testing with async start listener
+ * Testing with async stop listener
  */
-public class JmsAsyncStartListenerTest extends CamelTestSupport {
+public class JmsAsyncStopListenerTest extends CamelTestSupport {
 
     protected String componentName = "activemq";
 
     @Test
-    public void testAsyncStartListener() throws Exception {
+    public void testAsyncStopListener() throws Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(2);
 
@@ -53,7 +53,7 @@ public class JmsAsyncStartListenerTest e
         // so we need a persistent store in case no active consumers when we send the messages
         ConnectionFactory connectionFactory = CamelJmsTestHelper.createPersistentConnectionFactory();
         JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
-        jms.setAsyncStartListener(true);
+        jms.setAsyncStopListener(true);
         camelContext.addComponent(componentName, jms);
 
         return camelContext;