You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/04 09:06:57 UTC

svn commit: r1345844 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ main/java/org/apache/camel/component/jms/reply/ test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Mon Jun  4 07:06:55 2012
New Revision: 1345844

URL: http://svn.apache.org/viewvc?rev=1345844&view=rev
Log:
CAMEL-5309: Fixed issue when reusing previous jms endpoint with old reply manager, when readding a route. The reply manager in use should be be be associated with its producer, and also tied to the lifecycle of the producer.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java
      - copied, changed from r1345686, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Jun  4 07:06:55 2012
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.component.jms;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -43,16 +40,12 @@ import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
-import org.apache.camel.component.jms.reply.ReplyManager;
-import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,10 +74,6 @@ public class JmsEndpoint extends Default
     private Destination destination;
     private String selector;
     private JmsConfiguration configuration;
-    private final Map<String, ReplyManager> replyToReplyManager = new HashMap<String, ReplyManager>();
-    private ReplyManager replyManager;
-    // scheduled executor to check for timeout (reply not received)
-    private ScheduledExecutorService replyManagerExecutorService;
     private final AtomicBoolean running = new AtomicBoolean();
     private volatile boolean destroying;
 
@@ -177,14 +166,6 @@ public class JmsEndpoint extends Default
             notifyAll();
         }
     }
-    public void destroyMessageListenerContainer(final AbstractMessageListenerContainer listenerContainer) {
-        destroying = true;
-        this.getReplyManagerExecutorService().execute(new Runnable() {
-            public void run() {
-                destroyMessageListenerContainerInternal(listenerContainer);
-            }
-        });
-    }
 
     public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception {
         return configuration.createMessageListenerContainer(this);
@@ -389,31 +370,6 @@ public class JmsEndpoint extends Default
         return true;
     }
 
-    public synchronized ReplyManager getReplyManager() throws Exception {
-        if (replyManager == null) {
-            // use a temporary queue
-            replyManager = new TemporaryQueueReplyManager();
-            replyManager.setEndpoint(this);
-            replyManager.setScheduledExecutorService(getReplyManagerExecutorService());
-            ServiceHelper.startService(replyManager);
-        }
-        return replyManager;
-    }
-
-    public synchronized ReplyManager getReplyManager(String replyTo) throws Exception {
-        ReplyManager answer = replyToReplyManager.get(replyTo);
-        if (answer == null) {
-            // use a persistent queue
-            answer = new PersistentQueueReplyManager();
-            answer.setEndpoint(this);
-            answer.setScheduledExecutorService(getReplyManagerExecutorService());
-            ServiceHelper.startService(answer);
-            // remember this manager so we can re-use it
-            replyToReplyManager.put(replyTo, answer);
-        }
-        return answer;
-    }
-
     public boolean isPubSubDomain() {
         return pubSubDomain;
     }
@@ -449,7 +405,6 @@ public class JmsEndpoint extends Default
         return metadata;
     }
 
-
     /**
      * Returns the {@link JmsOperations} used for metadata operations such as creating temporary destinations
      */
@@ -461,14 +416,6 @@ public class JmsEndpoint extends Default
         return template;
     }
 
-    protected synchronized ScheduledExecutorService getReplyManagerExecutorService() {
-        if (replyManagerExecutorService == null) {
-            String name = "JmsReplyManagerTimeoutChecker[" + getEndpointConfiguredDestinationName() + "]";
-            replyManagerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
-        }
-        return replyManagerExecutorService;
-    }
-    
     protected ExecutorService getAsyncStartStopExecutorService() {
         if (getComponent() == null) {
             throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this);
@@ -492,23 +439,6 @@ public class JmsEndpoint extends Default
     @Override
     protected void doStop() throws Exception {
         running.set(false);
-
-        if (replyManager != null) {
-            ServiceHelper.stopService(replyManager);
-            replyManager = null;
-        }
-
-        if (!replyToReplyManager.isEmpty()) {
-            for (ReplyManager replyManager : replyToReplyManager.values()) {
-                ServiceHelper.stopService(replyManager);
-            }
-            replyToReplyManager.clear();
-        }
-
-        if (replyManagerExecutorService != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(replyManagerExecutorService);
-            replyManagerExecutorService = null;
-        }
     }
 
     // Delegated properties from the configuration

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Jun  4 07:06:55 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms;
 
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -29,11 +30,14 @@ import org.apache.camel.Exchange;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
+import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
 import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
 import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.ValueHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +46,7 @@ import org.springframework.jms.core.Mess
 import org.springframework.jms.support.JmsUtils;
 
 import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
+
 /**
  * @version 
  */
@@ -59,6 +64,11 @@ public class JmsProducer extends Default
         this.endpoint = endpoint;
     }
 
+    @Override
+    public JmsEndpoint getEndpoint() {
+        return (JmsEndpoint) super.getEndpoint();
+    }
+
     protected void initReplyManager() {
         if (!started.get()) {
             synchronized (this) {
@@ -76,12 +86,12 @@ public class JmsProducer extends Default
                     }
 
                     if (endpoint.getReplyTo() != null) {
-                        replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
+                        replyManager = createReplyManager(endpoint.getReplyTo());
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Using JmsReplyManager: {} to process replies from: {}", replyManager, endpoint.getReplyTo());
                         }
                     } else {
-                        replyManager = endpoint.getReplyManager();
+                        replyManager = createReplyManager();
                         LOG.debug("Using JmsReplyManager: {} to process replies from temporary queue", replyManager);
                     }
                 } catch (Exception e) {
@@ -92,6 +102,16 @@ public class JmsProducer extends Default
         }
     }
 
+    protected void unInitReplyManager() {
+        try {
+            ServiceHelper.stopService(replyManager);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            started.set(false);
+        }
+    }
+
     public boolean process(Exchange exchange, AsyncCallback callback) {
         // deny processing if we are not started
         if (!isRunAllowed()) {
@@ -444,5 +464,35 @@ public class JmsProducer extends Default
 
     protected void doStop() throws Exception {
         super.doStop();
+
+        // must stop/un-init reply manager if it was in use
+        unInitReplyManager();
     }
+
+    protected ReplyManager createReplyManager() throws Exception {
+        // use a temporary queue
+        ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext());
+        replyManager.setEndpoint(getEndpoint());
+
+        String name = "JmsReplyManagerTimeoutChecker[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
+        ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
+        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        ServiceHelper.startService(replyManager);
+
+        return replyManager;
+    }
+
+    protected ReplyManager createReplyManager(String replyTo) throws Exception {
+        // use a persistent queue
+        ReplyManager replyManager = new PersistentQueueReplyManager(getEndpoint().getCamelContext());
+        replyManager.setEndpoint(getEndpoint());
+
+        String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
+        ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
+        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        ServiceHelper.startService(replyManager);
+
+        return replyManager;
+    }
+
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Mon Jun  4 07:06:55 2012
@@ -24,6 +24,7 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
 import org.apache.camel.component.jms.ReplyToType;
@@ -41,6 +42,10 @@ public class PersistentQueueReplyManager
     private String replyToSelectorValue;
     private MessageSelectorCreator dynamicMessageSelector;
 
+    public PersistentQueueReplyManager(CamelContext camelContext) {
+        super(camelContext);
+    }
+
     public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
                                 String originalCorrelationId, String correlationId, long requestTimeout) {
         // add to correlation map

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Mon Jun  4 07:06:55 2012
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.component.jms.JmsEndpoint;
@@ -45,6 +46,7 @@ import org.springframework.jms.listener.
 public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    protected final CamelContext camelContext;
     protected ScheduledExecutorService executorService;
     protected JmsEndpoint endpoint;
     protected Destination replyTo;
@@ -53,6 +55,10 @@ public abstract class ReplyManagerSuppor
     protected final long replyToTimeout = 10000;
     protected CorrelationTimeoutMap correlation;
 
+    public ReplyManagerSupport(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public void setScheduledExecutorService(ScheduledExecutorService executorService) {
         this.executorService = executorService;
     }
@@ -229,6 +235,12 @@ public abstract class ReplyManagerSuppor
             listenerContainer.destroy();
             listenerContainer = null;
         }
+
+        // must also stop executor service
+        if (executorService != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
     }
 
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1345844&r1=1345843&r2=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Mon Jun  4 07:06:55 2012
@@ -23,6 +23,7 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -36,6 +37,10 @@ import org.springframework.jms.support.d
  */
 public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
+    public TemporaryQueueReplyManager(CamelContext camelContext) {
+        super(camelContext);
+    }
+
     public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
                                 String originalCorrelationId, String correlationId, long requestTimeout) {
         // add to correlation map

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java (from r1345686, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java&r1=1345686&r2=1345844&rev=1345844&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java Mon Jun  4 07:06:55 2012
@@ -19,47 +19,40 @@ package org.apache.camel.component.jms;
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.camel.util.StopWatch;
 import org.junit.Test;
 
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
 
 /**
- * Using exclusive fixed replyTo queues should be faster as there is no need for
- * JMSMessage selectors.
- *
- * @version 
+ * @version
  */
-public class JmsRequestReplyExclusiveReplyToTest extends CamelTestSupport {
+public class JmsRequestReplyExclusiveReplyToRemoveAddRouteTest extends CamelTestSupport {
 
     @Test
     public void testJmsRequestReplyExclusiveFixedReplyTo() throws Exception {
-        StopWatch watch = new StopWatch();
+        assertEquals("Hello A", template.requestBody("direct:start", "A"));
 
-        assertEquals("Hello A", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "A"));
-        assertEquals("Hello B", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "B"));
-        assertEquals("Hello C", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "C"));
-        assertEquals("Hello D", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "D"));
-        assertEquals("Hello E", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "E"));
+        // stop and remove route
+        context.stopRoute("start");
+        context.removeRoute("start");
 
-        long delta = watch.stop();
-        assertTrue("Should be faster than about 4 seconds, was: " + delta, delta < 4200);
-    }
+        // add new route using same jms endpoint uri
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start2").routeId("start2")
+                        .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
+                        .to("log:start2");
+            }
+        });
+        // and it should still work
 
-    @Test
-    public void testInvalidConfiguration() throws Exception {
-        try {
-            template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Temporary", "Hello World");
-            fail("Should have thrown exception");
-        } catch (CamelExecutionException e) {
-            assertIsInstanceOf(FailedToCreateProducerException.class, e.getCause());
-            assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause());
-            assertEquals("ReplyToType Temporary is not supported when replyTo bar is also configured.", e.getCause().getCause().getMessage());
-        }
+        assertEquals("Hello B", template.requestBody("direct:start2", "B"));
+        assertEquals("Hello C", template.requestBody("direct:start2", "C"));
+        assertEquals("Hello D", template.requestBody("direct:start2", "D"));
+        assertEquals("Hello E", template.requestBody("direct:start2", "E"));
     }
 
     protected CamelContext createCamelContext() throws Exception {
@@ -74,7 +67,11 @@ public class JmsRequestReplyExclusiveRep
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("activemq:queue:foo")
+                from("direct:start").routeId("start")
+                    .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
+                    .to("log:start");
+
+                from("activemq:queue:foo").routeId("foo")
                     .transform(body().prepend("Hello "));
             }
         };