You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/31 12:05:29 UTC

[3/5] git commit: CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.

CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e21ec70
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e21ec70
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e21ec70

Branch: refs/heads/camel-2.12.x
Commit: 2e21ec7012d3ab35ab8c344de0366436e4918bc5
Parents: 16d8180
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 11:45:13 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java | 11 ++++--
 .../apache/camel/component/jms/JmsEndpoint.java | 37 ++++++++++++++++----
 .../jms/reply/ReplyManagerSupport.java          | 12 +++++--
 .../jms/TwoConsumerOnSameQueueTest.java         |  1 -
 4 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
-        
+        getEndpoint().onListenerContainerStarting(listenerContainer);
+
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
             getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
 
     protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
-            listenerContainer.stop();
-            listenerContainer.destroy();
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                getEndpoint().onListenerConstainerStopped(listenerContainer);
+            }
         }
         // null container and listener so they are fully re created if this consumer is restarted
         // then we will use updated configuration from jms endpoint that may have been managed using JMX

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 701de7c..664da7c 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -71,6 +71,7 @@ import org.springframework.util.ErrorHandler;
 @UriEndpoint(scheme = "jms", consumerClass = JmsConsumer.class)
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicInteger runningMessageListeners = new AtomicInteger();
     @UriParam
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
@@ -82,7 +83,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     private String selector;
     @UriParam
     private JmsConfiguration configuration;
-    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -442,21 +442,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return getComponent().getAsyncStartStopExecutorService();
     }
 
+    public void onListenerContainerStarting(AbstractMessageListenerContainer container) {
+        runningMessageListeners.incrementAndGet();
+    }
+
+    public void onListenerConstainerStopped(AbstractMessageListenerContainer container) {
+        runningMessageListeners.decrementAndGet();
+    }
+
     /**
      * State whether this endpoint is running (eg started)
      */
     protected boolean isRunning() {
-        return running.get();
+        return isStarted();
     }
 
     @Override
-    protected void doStart() throws Exception {
-        running.set(true);
+    public void stop() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.stop();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this);
+        }
     }
 
     @Override
-    protected void doStop() throws Exception {
-        running.set(false);
+    public void shutdown() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.shutdown();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this);
+        }
     }
 
     // Delegated properties from the configuration
@@ -1146,6 +1164,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return status.name();
     }
 
+    @ManagedAttribute(description = "Number of running message listeners")
+    public int getRunningMessageListeners() {
+        return runningMessageListeners.get();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 3828926..173d9c8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         listenerContainer = createListenerContainer();
         listenerContainer.afterPropertiesSet();
         log.debug("Starting reply listener container on endpoint: {}", endpoint);
+
+        endpoint.onListenerContainerStarting(listenerContainer);
         listenerContainer.start();
     }
 
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
         if (listenerContainer != null) {
             log.debug("Stopping reply listener container on endpoint: {}", endpoint);
-            listenerContainer.stop();
-            listenerContainer.destroy();
-            listenerContainer = null;
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                endpoint.onListenerConstainerStopped(listenerContainer);
+                listenerContainer = null;
+            }
         }
 
         // must also stop executor service

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
index 3cdfd9e..55f73db 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -62,7 +62,6 @@ public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
     }
 
     @Test
-    @Ignore
     public void testRemoveOneRoute() throws Exception {
         sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();