You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/31 12:05:29 UTC
[3/5] git commit: CAMEL-6093: Fixed having 2+ routes from the same
JMS queue,
not stop the endpoint if there are still active listeners when a route is
stopped.
CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e21ec70
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e21ec70
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e21ec70
Branch: refs/heads/camel-2.12.x
Commit: 2e21ec7012d3ab35ab8c344de0366436e4918bc5
Parents: 16d8180
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 11:45:13 2013 +0200
----------------------------------------------------------------------
.../apache/camel/component/jms/JmsConsumer.java | 11 ++++--
.../apache/camel/component/jms/JmsEndpoint.java | 37 ++++++++++++++++----
.../jms/reply/ReplyManagerSupport.java | 12 +++++--
.../jms/TwoConsumerOnSameQueueTest.java | 1 -
4 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
if (listenerContainer == null) {
createMessageListenerContainer();
}
-
+ getEndpoint().onListenerContainerStarting(listenerContainer);
+
if (getEndpoint().getConfiguration().isAsyncStartListener()) {
getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
@Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
protected void stopAndDestroyListenerContainer() {
if (listenerContainer != null) {
- listenerContainer.stop();
- listenerContainer.destroy();
+ try {
+ listenerContainer.stop();
+ listenerContainer.destroy();
+ } finally {
+ getEndpoint().onListenerConstainerStopped(listenerContainer);
+ }
}
// null container and listener so they are fully re created if this consumer is restarted
// then we will use updated configuration from jms endpoint that may have been managed using JMX
http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 701de7c..664da7c 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
@@ -71,6 +71,7 @@ import org.springframework.util.ErrorHandler;
@UriEndpoint(scheme = "jms", consumerClass = JmsConsumer.class)
public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
protected final Logger log = LoggerFactory.getLogger(getClass());
+ private final AtomicInteger runningMessageListeners = new AtomicInteger();
@UriParam
private HeaderFilterStrategy headerFilterStrategy;
private boolean pubSubDomain;
@@ -82,7 +83,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
private String selector;
@UriParam
private JmsConfiguration configuration;
- private final AtomicBoolean running = new AtomicBoolean();
public JmsEndpoint() {
this(null, null);
@@ -442,21 +442,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
return getComponent().getAsyncStartStopExecutorService();
}
+ public void onListenerContainerStarting(AbstractMessageListenerContainer container) {
+ runningMessageListeners.incrementAndGet();
+ }
+
+ public void onListenerConstainerStopped(AbstractMessageListenerContainer container) {
+ runningMessageListeners.decrementAndGet();
+ }
+
/**
* State whether this endpoint is running (eg started)
*/
protected boolean isRunning() {
- return running.get();
+ return isStarted();
}
@Override
- protected void doStart() throws Exception {
- running.set(true);
+ public void stop() throws Exception {
+ int running = runningMessageListeners.get();
+ if (running <= 0) {
+ super.stop();
+ } else {
+ log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this);
+ }
}
@Override
- protected void doStop() throws Exception {
- running.set(false);
+ public void shutdown() throws Exception {
+ int running = runningMessageListeners.get();
+ if (running <= 0) {
+ super.shutdown();
+ } else {
+ log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this);
+ }
}
// Delegated properties from the configuration
@@ -1146,6 +1164,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
return status.name();
}
+ @ManagedAttribute(description = "Number of running message listeners")
+ public int getRunningMessageListeners() {
+ return runningMessageListeners.get();
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 3828926..173d9c8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
listenerContainer = createListenerContainer();
listenerContainer.afterPropertiesSet();
log.debug("Starting reply listener container on endpoint: {}", endpoint);
+
+ endpoint.onListenerContainerStarting(listenerContainer);
listenerContainer.start();
}
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
if (listenerContainer != null) {
log.debug("Stopping reply listener container on endpoint: {}", endpoint);
- listenerContainer.stop();
- listenerContainer.destroy();
- listenerContainer = null;
+ try {
+ listenerContainer.stop();
+ listenerContainer.destroy();
+ } finally {
+ endpoint.onListenerConstainerStopped(listenerContainer);
+ listenerContainer = null;
+ }
}
// must also stop executor service
http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
index 3cdfd9e..55f73db 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -62,7 +62,6 @@ public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
}
@Test
- @Ignore
public void testRemoveOneRoute() throws Exception {
sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();