You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/07 16:48:26 UTC

[5/8] git commit: CAMEL-7667: camel-jms route consumers will not accept new messages during stopping of CamelContext. This prevents issues with JMS connections being attempted to re-connect due failover protocol and during shutdown of the Camel app. This

CAMEL-7667: camel-jms route consumers will not accept new messages during stopping of CamelContext. This prevents issues with JMS connections being attempted to re-connect due failover protocol and during shutdown of the Camel app. This can lead to WARNs and ERRORs in the logs which we can avoid a bit more with letting the jms consumer shutdown quciker.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3d8ae863
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3d8ae863
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3d8ae863

Branch: refs/heads/master
Commit: 3d8ae86338598d279ae2e5bf58b2676d5237b3fc
Parents: 5707862
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Aug 7 15:57:41 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Aug 7 15:57:41 2014 +0200

----------------------------------------------------------------------
 .../jms/DefaultJmsMessageListenerContainer.java | 37 ++++++++++++++++++--
 .../ExclusiveQueueMessageListenerContainer.java |  3 +-
 .../SharedQueueMessageListenerContainer.java    |  5 +--
 .../jms/reply/TemporaryQueueReplyManager.java   |  3 +-
 4 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
index 793bb75..ba3282e 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
@@ -35,15 +35,48 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerContainer {
 
     private final JmsEndpoint endpoint;
+    private final boolean allowQuickStop;
 
     public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) {
+        this(endpoint, true);
+    }
+
+    public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint, boolean allowQuickStop) {
         this.endpoint = endpoint;
+        this.allowQuickStop = allowQuickStop;
+    }
+
+    /**
+     * Whether this {@link DefaultMessageListenerContainer} allows the {@link #runningAllowed()} to quick stop
+     * in case {@link JmsConfiguration#isAcceptMessagesWhileStopping()} is enabled, and {@link org.apache.camel.CamelContext}
+     * is currently being stopped.
+     */
+    protected boolean isAllowQuickStop() {
+        return allowQuickStop;
     }
 
     @Override
     protected boolean runningAllowed() {
-        // do not run if we have been stopped
-        return endpoint.isRunning();
+        // we can stop quickly if CamelContext is being stopped, and we do not accept messages while stopping
+        // this allows a more cleanly shutdown of the message listener
+        boolean quickStop = false;
+        if (isAllowQuickStop() && !endpoint.isAcceptMessagesWhileStopping()) {
+            quickStop = endpoint.getCamelContext().getStatus().isStopping();
+        }
+
+        if (quickStop) {
+            // log at debug level so its quicker to see we are stopping quicker from the logs
+            logger.debug("runningAllowed() -> false due CamelContext is stopping and endpoint configured to not accept messages while stopping");
+            return false;
+        } else {
+            // otherwise we only run if the endpoint is running
+            boolean answer = endpoint.isRunning();
+            // log at trace level as otherwise this can be noisy during normal operation
+            if (logger.isTraceEnabled()) {
+                logger.trace("runningAllowed() -> " + answer);
+            }
+            return answer;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
index b572541..64b7bfe 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
@@ -37,6 +37,7 @@ public class ExclusiveQueueMessageListenerContainer extends DefaultJmsMessageLis
     // no need to override any methods currently
 
     public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) {
-        super(endpoint);
+        // request-reply listener container should not allow quick-stop so we can keep listening for reply messages
+        super(endpoint, false);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index f6464ff..3b682e0 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -48,7 +48,8 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
      * @param fixedMessageSelector the fixed selector
      */
     public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) {
-        super(endpoint);
+        // request-reply listener container should not allow quick-stop so we can keep listening for reply messages
+        super(endpoint, false);
         this.fixedMessageSelector = fixedMessageSelector;
     }
 
@@ -59,7 +60,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
      * @param creator the create to create the dynamic selector
      */
     public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) {
-        super(endpoint);
+        super(endpoint, false);
         this.creator = creator;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index a5e8798..f7430eb 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -99,7 +99,8 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
     @Override
     protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
         // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
-        DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint);
+        // request-reply listener container should not allow quick-stop so we can keep listening for reply messages
+        DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, false);
 
         answer.setDestinationName("temporary");
         answer.setDestinationResolver(destResolver);