You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/04 09:06:57 UTC
svn commit: r1345844 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
main/java/org/apache/camel/component/jms/reply/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Mon Jun 4 07:06:55 2012
New Revision: 1345844
URL: http://svn.apache.org/viewvc?rev=1345844&view=rev
Log:
CAMEL-5309: Fixed issue when reusing previous jms endpoint with old reply manager, when readding a route. The reply manager in use should be be be associated with its producer, and also tied to the lifecycle of the producer.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java
- copied, changed from r1345686, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Jun 4 07:06:55 2012
@@ -16,11 +16,8 @@
*/
package org.apache.camel.component.jms;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -43,16 +40,12 @@ import org.apache.camel.Service;
import org.apache.camel.ServiceStatus;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
-import org.apache.camel.component.jms.reply.ReplyManager;
-import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronousDelegateProducer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,10 +74,6 @@ public class JmsEndpoint extends Default
private Destination destination;
private String selector;
private JmsConfiguration configuration;
- private final Map<String, ReplyManager> replyToReplyManager = new HashMap<String, ReplyManager>();
- private ReplyManager replyManager;
- // scheduled executor to check for timeout (reply not received)
- private ScheduledExecutorService replyManagerExecutorService;
private final AtomicBoolean running = new AtomicBoolean();
private volatile boolean destroying;
@@ -177,14 +166,6 @@ public class JmsEndpoint extends Default
notifyAll();
}
}
- public void destroyMessageListenerContainer(final AbstractMessageListenerContainer listenerContainer) {
- destroying = true;
- this.getReplyManagerExecutorService().execute(new Runnable() {
- public void run() {
- destroyMessageListenerContainerInternal(listenerContainer);
- }
- });
- }
public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception {
return configuration.createMessageListenerContainer(this);
@@ -389,31 +370,6 @@ public class JmsEndpoint extends Default
return true;
}
- public synchronized ReplyManager getReplyManager() throws Exception {
- if (replyManager == null) {
- // use a temporary queue
- replyManager = new TemporaryQueueReplyManager();
- replyManager.setEndpoint(this);
- replyManager.setScheduledExecutorService(getReplyManagerExecutorService());
- ServiceHelper.startService(replyManager);
- }
- return replyManager;
- }
-
- public synchronized ReplyManager getReplyManager(String replyTo) throws Exception {
- ReplyManager answer = replyToReplyManager.get(replyTo);
- if (answer == null) {
- // use a persistent queue
- answer = new PersistentQueueReplyManager();
- answer.setEndpoint(this);
- answer.setScheduledExecutorService(getReplyManagerExecutorService());
- ServiceHelper.startService(answer);
- // remember this manager so we can re-use it
- replyToReplyManager.put(replyTo, answer);
- }
- return answer;
- }
-
public boolean isPubSubDomain() {
return pubSubDomain;
}
@@ -449,7 +405,6 @@ public class JmsEndpoint extends Default
return metadata;
}
-
/**
* Returns the {@link JmsOperations} used for metadata operations such as creating temporary destinations
*/
@@ -461,14 +416,6 @@ public class JmsEndpoint extends Default
return template;
}
- protected synchronized ScheduledExecutorService getReplyManagerExecutorService() {
- if (replyManagerExecutorService == null) {
- String name = "JmsReplyManagerTimeoutChecker[" + getEndpointConfiguredDestinationName() + "]";
- replyManagerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
- }
- return replyManagerExecutorService;
- }
-
protected ExecutorService getAsyncStartStopExecutorService() {
if (getComponent() == null) {
throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this);
@@ -492,23 +439,6 @@ public class JmsEndpoint extends Default
@Override
protected void doStop() throws Exception {
running.set(false);
-
- if (replyManager != null) {
- ServiceHelper.stopService(replyManager);
- replyManager = null;
- }
-
- if (!replyToReplyManager.isEmpty()) {
- for (ReplyManager replyManager : replyToReplyManager.values()) {
- ServiceHelper.stopService(replyManager);
- }
- replyToReplyManager.clear();
- }
-
- if (replyManagerExecutorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(replyManagerExecutorService);
- replyManagerExecutorService = null;
- }
}
// Delegated properties from the configuration
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Jun 4 07:06:55 2012
@@ -17,6 +17,7 @@
package org.apache.camel.component.jms;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -29,11 +30,14 @@ import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
+import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.UuidGenerator;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.ValueHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +46,7 @@ import org.springframework.jms.core.Mess
import org.springframework.jms.support.JmsUtils;
import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
+
/**
* @version
*/
@@ -59,6 +64,11 @@ public class JmsProducer extends Default
this.endpoint = endpoint;
}
+ @Override
+ public JmsEndpoint getEndpoint() {
+ return (JmsEndpoint) super.getEndpoint();
+ }
+
protected void initReplyManager() {
if (!started.get()) {
synchronized (this) {
@@ -76,12 +86,12 @@ public class JmsProducer extends Default
}
if (endpoint.getReplyTo() != null) {
- replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
+ replyManager = createReplyManager(endpoint.getReplyTo());
if (LOG.isDebugEnabled()) {
LOG.debug("Using JmsReplyManager: {} to process replies from: {}", replyManager, endpoint.getReplyTo());
}
} else {
- replyManager = endpoint.getReplyManager();
+ replyManager = createReplyManager();
LOG.debug("Using JmsReplyManager: {} to process replies from temporary queue", replyManager);
}
} catch (Exception e) {
@@ -92,6 +102,16 @@ public class JmsProducer extends Default
}
}
+ protected void unInitReplyManager() {
+ try {
+ ServiceHelper.stopService(replyManager);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ started.set(false);
+ }
+ }
+
public boolean process(Exchange exchange, AsyncCallback callback) {
// deny processing if we are not started
if (!isRunAllowed()) {
@@ -444,5 +464,35 @@ public class JmsProducer extends Default
protected void doStop() throws Exception {
super.doStop();
+
+ // must stop/un-init reply manager if it was in use
+ unInitReplyManager();
}
+
+ protected ReplyManager createReplyManager() throws Exception {
+ // use a temporary queue
+ ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext());
+ replyManager.setEndpoint(getEndpoint());
+
+ String name = "JmsReplyManagerTimeoutChecker[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
+ ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
+ replyManager.setScheduledExecutorService(replyManagerExecutorService);
+ ServiceHelper.startService(replyManager);
+
+ return replyManager;
+ }
+
+ protected ReplyManager createReplyManager(String replyTo) throws Exception {
+ // use a persistent queue
+ ReplyManager replyManager = new PersistentQueueReplyManager(getEndpoint().getCamelContext());
+ replyManager.setEndpoint(getEndpoint());
+
+ String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
+ ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
+ replyManager.setScheduledExecutorService(replyManagerExecutorService);
+ ServiceHelper.startService(replyManager);
+
+ return replyManager;
+ }
+
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Mon Jun 4 07:06:55 2012
@@ -24,6 +24,7 @@ import javax.jms.Message;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.DefaultSpringErrorHandler;
import org.apache.camel.component.jms.ReplyToType;
@@ -41,6 +42,10 @@ public class PersistentQueueReplyManager
private String replyToSelectorValue;
private MessageSelectorCreator dynamicMessageSelector;
+ public PersistentQueueReplyManager(CamelContext camelContext) {
+ super(camelContext);
+ }
+
public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout) {
// add to correlation map
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Mon Jun 4 07:06:55 2012
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.component.jms.JmsEndpoint;
@@ -45,6 +46,7 @@ import org.springframework.jms.listener.
public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
protected final Logger log = LoggerFactory.getLogger(getClass());
+ protected final CamelContext camelContext;
protected ScheduledExecutorService executorService;
protected JmsEndpoint endpoint;
protected Destination replyTo;
@@ -53,6 +55,10 @@ public abstract class ReplyManagerSuppor
protected final long replyToTimeout = 10000;
protected CorrelationTimeoutMap correlation;
+ public ReplyManagerSupport(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
public void setScheduledExecutorService(ScheduledExecutorService executorService) {
this.executorService = executorService;
}
@@ -229,6 +235,12 @@ public abstract class ReplyManagerSuppor
listenerContainer.destroy();
listenerContainer = null;
}
+
+ // must also stop executor service
+ if (executorService != null) {
+ camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ executorService = null;
+ }
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Mon Jun 4 07:06:55 2012
@@ -23,6 +23,7 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.DefaultSpringErrorHandler;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -36,6 +37,10 @@ import org.springframework.jms.support.d
*/
public class TemporaryQueueReplyManager extends ReplyManagerSupport {
+ public TemporaryQueueReplyManager(CamelContext camelContext) {
+ super(camelContext);
+ }
+
public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout) {
// add to correlation map
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java (from r1345686, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java&r1=1345686&r2=1345844&rev=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java Mon Jun 4 07:06:55 2012
@@ -19,47 +19,40 @@ package org.apache.camel.component.jms;
import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.camel.util.StopWatch;
import org.junit.Test;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
/**
- * Using exclusive fixed replyTo queues should be faster as there is no need for
- * JMSMessage selectors.
- *
- * @version
+ * @version
*/
-public class JmsRequestReplyExclusiveReplyToTest extends CamelTestSupport {
+public class JmsRequestReplyExclusiveReplyToRemoveAddRouteTest extends CamelTestSupport {
@Test
public void testJmsRequestReplyExclusiveFixedReplyTo() throws Exception {
- StopWatch watch = new StopWatch();
+ assertEquals("Hello A", template.requestBody("direct:start", "A"));
- assertEquals("Hello A", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "A"));
- assertEquals("Hello B", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "B"));
- assertEquals("Hello C", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "C"));
- assertEquals("Hello D", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "D"));
- assertEquals("Hello E", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "E"));
+ // stop and remove route
+ context.stopRoute("start");
+ context.removeRoute("start");
- long delta = watch.stop();
- assertTrue("Should be faster than about 4 seconds, was: " + delta, delta < 4200);
- }
+ // add new route using same jms endpoint uri
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start2").routeId("start2")
+ .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
+ .to("log:start2");
+ }
+ });
+ // and it should still work
- @Test
- public void testInvalidConfiguration() throws Exception {
- try {
- template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Temporary", "Hello World");
- fail("Should have thrown exception");
- } catch (CamelExecutionException e) {
- assertIsInstanceOf(FailedToCreateProducerException.class, e.getCause());
- assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause());
- assertEquals("ReplyToType Temporary is not supported when replyTo bar is also configured.", e.getCause().getCause().getMessage());
- }
+ assertEquals("Hello B", template.requestBody("direct:start2", "B"));
+ assertEquals("Hello C", template.requestBody("direct:start2", "C"));
+ assertEquals("Hello D", template.requestBody("direct:start2", "D"));
+ assertEquals("Hello E", template.requestBody("direct:start2", "E"));
}
protected CamelContext createCamelContext() throws Exception {
@@ -74,7 +67,11 @@ public class JmsRequestReplyExclusiveRep
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("activemq:queue:foo")
+ from("direct:start").routeId("start")
+ .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
+ .to("log:start");
+
+ from("activemq:queue:foo").routeId("foo")
.transform(body().prepend("Hello "));
}
};