You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/07 16:48:26 UTC
[5/8] git commit: CAMEL-7667: camel-jms route consumers will not
accept new messages during stopping of CamelContext. This prevents issues
with JMS connections being attempted to re-connect due failover protocol and
during shutdown of the Camel app. This
CAMEL-7667: camel-jms route consumers will not accept new messages during stopping of CamelContext. This prevents issues with JMS connections being attempted to re-connect due failover protocol and during shutdown of the Camel app. This can lead to WARNs and ERRORs in the logs which we can avoid a bit more with letting the jms consumer shutdown quciker.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3d8ae863
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3d8ae863
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3d8ae863
Branch: refs/heads/master
Commit: 3d8ae86338598d279ae2e5bf58b2676d5237b3fc
Parents: 5707862
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Aug 7 15:57:41 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Aug 7 15:57:41 2014 +0200
----------------------------------------------------------------------
.../jms/DefaultJmsMessageListenerContainer.java | 37 ++++++++++++++++++--
.../ExclusiveQueueMessageListenerContainer.java | 3 +-
.../SharedQueueMessageListenerContainer.java | 5 +--
.../jms/reply/TemporaryQueueReplyManager.java | 3 +-
4 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
index 793bb75..ba3282e 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
@@ -35,15 +35,48 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerContainer {
private final JmsEndpoint endpoint;
+ private final boolean allowQuickStop;
public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint) {
+ this(endpoint, true);
+ }
+
+ public DefaultJmsMessageListenerContainer(JmsEndpoint endpoint, boolean allowQuickStop) {
this.endpoint = endpoint;
+ this.allowQuickStop = allowQuickStop;
+ }
+
+ /**
+ * Whether this {@link DefaultMessageListenerContainer} allows the {@link #runningAllowed()} to quick stop
+ * in case {@link JmsConfiguration#isAcceptMessagesWhileStopping()} is enabled, and {@link org.apache.camel.CamelContext}
+ * is currently being stopped.
+ */
+ protected boolean isAllowQuickStop() {
+ return allowQuickStop;
}
@Override
protected boolean runningAllowed() {
- // do not run if we have been stopped
- return endpoint.isRunning();
+ // we can stop quickly if CamelContext is being stopped, and we do not accept messages while stopping
+ // this allows a more cleanly shutdown of the message listener
+ boolean quickStop = false;
+ if (isAllowQuickStop() && !endpoint.isAcceptMessagesWhileStopping()) {
+ quickStop = endpoint.getCamelContext().getStatus().isStopping();
+ }
+
+ if (quickStop) {
+ // log at debug level so its quicker to see we are stopping quicker from the logs
+ logger.debug("runningAllowed() -> false due CamelContext is stopping and endpoint configured to not accept messages while stopping");
+ return false;
+ } else {
+ // otherwise we only run if the endpoint is running
+ boolean answer = endpoint.isRunning();
+ // log at trace level as otherwise this can be noisy during normal operation
+ if (logger.isTraceEnabled()) {
+ logger.trace("runningAllowed() -> " + answer);
+ }
+ return answer;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
index b572541..64b7bfe 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
@@ -37,6 +37,7 @@ public class ExclusiveQueueMessageListenerContainer extends DefaultJmsMessageLis
// no need to override any methods currently
public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) {
- super(endpoint);
+ // request-reply listener container should not allow quick-stop so we can keep listening for reply messages
+ super(endpoint, false);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index f6464ff..3b682e0 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -48,7 +48,8 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
* @param fixedMessageSelector the fixed selector
*/
public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) {
- super(endpoint);
+ // request-reply listener container should not allow quick-stop so we can keep listening for reply messages
+ super(endpoint, false);
this.fixedMessageSelector = fixedMessageSelector;
}
@@ -59,7 +60,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
* @param creator the create to create the dynamic selector
*/
public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) {
- super(endpoint);
+ super(endpoint, false);
this.creator = creator;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3d8ae863/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index a5e8798..f7430eb 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -99,7 +99,8 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
@Override
protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
// Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
- DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint);
+ // request-reply listener container should not allow quick-stop so we can keep listening for reply messages
+ DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint, false);
answer.setDestinationName("temporary");
answer.setDestinationResolver(destResolver);