You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2013/01/10 00:49:26 UTC

svn commit: r1431152 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java

Author: raulk
Date: Wed Jan  9 23:49:26 2013
New Revision: 1431152

URL: http://svn.apache.org/viewvc?rev=1431152&view=rev
Log:
CAMEL-5865 Enhanced concurrent consumers support for JMS producers using Temp Reply Queue for replies

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Wed Jan  9 23:49:26 2013
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.component.jms.reply;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
@@ -37,11 +40,23 @@ import org.springframework.jms.support.d
  * @version 
  */
 public class TemporaryQueueReplyManager extends ReplyManagerSupport {
-
+    
+    final TemporaryReplyQueueDestinationResolver destResolver = new TemporaryReplyQueueDestinationResolver();
+    
     public TemporaryQueueReplyManager(CamelContext camelContext) {
         super(camelContext);
     }
 
+    @Override
+    public Destination getReplyTo() {
+        try {
+            destResolver.destinationReady();
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for JMSReplyTo destination refresh", e);
+        }
+        return super.getReplyTo();
+    }
+    
     public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
                                 String originalCorrelationId, String correlationId, long requestTimeout) {
         // add to correlation map
@@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager 
         DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint);
 
         answer.setDestinationName("temporary");
-        answer.setDestinationResolver(new DestinationResolver() {
-            public Destination resolveDestinationName(Session session, String destinationName,
-                                                      boolean pubSubDomain) throws JMSException {
-                // use a temporary queue to gather the reply message
-                TemporaryQueue queue = session.createTemporaryQueue();
-                setReplyTo(queue);
-                return queue;
-            }
-        });
+        answer.setDestinationResolver(destResolver);
         answer.setAutoStartup(true);
         if (endpoint.getMaxMessagesPerTask() >= 0) {
             answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask());
@@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager 
             answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
         }
         answer.setConnectionFactory(endpoint.getConnectionFactory());
+        // we use CACHE_CONSUMER to cling to the consumer as long as we can, since we can only consume
+        // msgs from the JMS Connection that created the temp destination in the first place
+        answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
         String clientId = endpoint.getClientId();
         if (clientId != null) {
             clientId += ".CamelReplyManager";
@@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager 
 
         // we cannot do request-reply over JMS with transaction
         answer.setSessionTransacted(false);
-
+        
         // other optional properties
-        if (endpoint.getExceptionListener() != null) {
-            answer.setExceptionListener(endpoint.getExceptionListener());
-        }
+        answer.setExceptionListener(new TemporaryReplyQueueExceptionListener(destResolver, endpoint.getExceptionListener()));
+
         if (endpoint.getErrorHandler() != null) {
             answer.setErrorHandler(endpoint.getErrorHandler());
         } else {
@@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager 
             answer.setTaskExecutor(endpoint.getTaskExecutor());
         }
 
-        // setup a bean name which is used ny Spring JMS as the thread name
-        String name = "TemporaryQueueReplyManager[" + answer.getDestinationName() + "]";
+        // setup a bean name which is used by Spring JMS as the thread name
+        // use the name of the request destination
+        String name = "TemporaryQueueReplyManager[" + endpoint.getDestinationName() + "]";
         answer.setBeanName(name);
 
         if (answer.getConcurrentConsumers() > 1) {
@@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager 
         return answer;
     }
 
+    private final class TemporaryReplyQueueExceptionListener implements ExceptionListener {
+        private final TemporaryReplyQueueDestinationResolver destResolver;
+        private final ExceptionListener delegate;
+
+        private TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver destResolver, 
+                ExceptionListener delegate) {
+            this.destResolver = destResolver;
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void onException(JMSException exception) {
+            // capture exceptions, and schedule a refresh of the ReplyTo destination
+            log.warn("Exception inside the DMLC for Temporary ReplyTo Queue for destination " + endpoint.getDestinationName() +
+            		", refreshing ReplyTo destination", exception);
+            destResolver.scheduleRefresh();
+            // serve as a proxy for any exception listener the user may have set explicitly
+            if (delegate != null) {
+                delegate.onException(exception);
+            }
+        }
+
+    }
+
+    private final class TemporaryReplyQueueDestinationResolver implements DestinationResolver {
+        private TemporaryQueue queue;
+        private AtomicBoolean refreshWanted = new AtomicBoolean(false);
+
+        public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) 
+                throws JMSException {
+            // use a temporary queue to gather the reply message
+            synchronized (refreshWanted) {
+                if (queue == null || refreshWanted.compareAndSet(true, false)) {
+                    queue = session.createTemporaryQueue();
+                    setReplyTo(queue);
+                    log.debug("Refreshed Temporary ReplyTo Queue. New queue: " + queue.getQueueName());
+                    refreshWanted.notifyAll();
+                }
+            }
+            return queue;
+        }
+        
+        public void scheduleRefresh() {
+            refreshWanted.set(true);
+        }
+        
+        public void destinationReady() throws InterruptedException {
+            if (refreshWanted.get()) {
+                synchronized (refreshWanted) {
+                    log.debug("Waiting for new Temp ReplyTo destination to be assigned to continue");
+                    refreshWanted.wait();
+                }
+            }
+        }
+    }
+
 }

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java Wed Jan  9 23:49:26 2013
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * Reliability tests for JMS TempQueue Reply Manager with multiple consumers.
+ * @version 
+ */
+public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupport {
+
+    private Map<String, AtomicInteger> msgsPerThread = new ConcurrentHashMap<String, AtomicInteger>();
+    private BrokerService broker;
+    private PooledConnectionFactory connectionFactory;
+    
+    @Test
+    public void testMultipleConsumingThreads() throws Exception {
+        doSendMessages(1000, 5);
+        assertTrue("Expected multiple consuming threads, but only found: " +  msgsPerThread.keySet().size(), 
+                msgsPerThread.keySet().size() > 1);
+    }
+    
+    @Test
+    public void testTempQueueRefreshed() throws Exception {
+        doSendMessages(500, 5);
+        connectionFactory.clear();
+        doSendMessages(100, 5);
+        connectionFactory.clear();
+        doSendMessages(100, 5);
+        connectionFactory.clear();
+        doSendMessages(100, 5);
+        connectionFactory.clear();
+        doSendMessages(100, 5);
+        connectionFactory.clear();
+        doSendMessages(100, 5);
+    }
+
+    private void doSendMessages(int files, int poolSize) throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(files);
+        getMockEndpoint("mock:result").expectsNoDuplicates(body());
+
+        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+        for (int i = 0; i < files; i++) {
+            final int index = i;
+            executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    template.sendBody("seda:start", "Message " + index);
+                    return null;
+                }
+            });
+        }
+
+        assertMockEndpointsSatisfied();
+        resetMocks();
+        executor.shutdownNow();
+    }
+    
+    public void startBroker() throws Exception {
+        String brokerName = "test-broker-" + System.currentTimeMillis();
+        String brokerUri = "vm://" + brokerName;
+        broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        broker.setBrokerId(brokerName);
+        broker.addConnector(brokerUri);
+        broker.setPersistent(false);
+        broker.start();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        //startBroker();
+        
+        connectionFactory = (PooledConnectionFactory) CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("jms", jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                    .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            String threadName = Thread.currentThread().getName();
+                            synchronized (msgsPerThread) {
+                               AtomicInteger count = msgsPerThread.get(threadName);
+                               if (count == null) {
+                                   count = new AtomicInteger(0);
+                                   msgsPerThread.put(threadName, count);
+                               }
+                               count.incrementAndGet();
+                            }
+                        }
+                    })
+                    .to("mock:result");
+
+                from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10")
+                    .setBody(simple("Reply >>> ${body}"));
+            }
+        };
+    }
+    
+}



Re: svn commit: r1431152 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java

Posted by Raul Kripalani <ra...@evosent.com>.
Ah, I overlooked this option. I'll fix it in a new commit soon.

Thanks for the report, Claus!

Raúl.

On Thu, Jan 10, 2013 at 12:15 PM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> I think we should honor the option replyToCacheLevelName, so people
> can configure the cache level.
> Now you hardcoded it to consumer.
>
> Some brokers may not work well with that. So giving end users the
> option to set the replyToCacheLevelName is better.
> The default can still be cache consumer.
>
>
>
> On Thu, Jan 10, 2013 at 12:49 AM,  <ra...@apache.org> wrote:
> > Author: raulk
> > Date: Wed Jan  9 23:49:26 2013
> > New Revision: 1431152
> >
> > URL: http://svn.apache.org/viewvc?rev=1431152&view=rev
> > Log:
> > CAMEL-5865 Enhanced concurrent consumers support for JMS producers using
> Temp Reply Queue for replies
> >
> > Added:
> >
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> > Modified:
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> >
> > Modified:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> (original)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> Wed Jan  9 23:49:26 2013
> > @@ -16,7 +16,10 @@
> >   */
> >  package org.apache.camel.component.jms.reply;
> >
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +
> >  import javax.jms.Destination;
> > +import javax.jms.ExceptionListener;
> >  import javax.jms.JMSException;
> >  import javax.jms.Message;
> >  import javax.jms.Session;
> > @@ -37,11 +40,23 @@ import org.springframework.jms.support.d
> >   * @version
> >   */
> >  public class TemporaryQueueReplyManager extends ReplyManagerSupport {
> > -
> > +
> > +    final TemporaryReplyQueueDestinationResolver destResolver = new
> TemporaryReplyQueueDestinationResolver();
> > +
> >      public TemporaryQueueReplyManager(CamelContext camelContext) {
> >          super(camelContext);
> >      }
> >
> > +    @Override
> > +    public Destination getReplyTo() {
> > +        try {
> > +            destResolver.destinationReady();
> > +        } catch (InterruptedException e) {
> > +            log.warn("Interrupted while waiting for JMSReplyTo
> destination refresh", e);
> > +        }
> > +        return super.getReplyTo();
> > +    }
> > +
> >      public String registerReply(ReplyManager replyManager, Exchange
> exchange, AsyncCallback callback,
> >                                  String originalCorrelationId, String
> correlationId, long requestTimeout) {
> >          // add to correlation map
> > @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager
> >          DefaultMessageListenerContainer answer = new
> DefaultJmsMessageListenerContainer(endpoint);
> >
> >          answer.setDestinationName("temporary");
> > -        answer.setDestinationResolver(new DestinationResolver() {
> > -            public Destination resolveDestinationName(Session session,
> String destinationName,
> > -                                                      boolean
> pubSubDomain) throws JMSException {
> > -                // use a temporary queue to gather the reply message
> > -                TemporaryQueue queue = session.createTemporaryQueue();
> > -                setReplyTo(queue);
> > -                return queue;
> > -            }
> > -        });
> > +        answer.setDestinationResolver(destResolver);
> >          answer.setAutoStartup(true);
> >          if (endpoint.getMaxMessagesPerTask() >= 0) {
> >
>  answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask());
> > @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager
> >
>  answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
> >          }
> >          answer.setConnectionFactory(endpoint.getConnectionFactory());
> > +        // we use CACHE_CONSUMER to cling to the consumer as long as we
> can, since we can only consume
> > +        // msgs from the JMS Connection that created the temp
> destination in the first place
> > +
>  answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
> >          String clientId = endpoint.getClientId();
> >          if (clientId != null) {
> >              clientId += ".CamelReplyManager";
> > @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager
> >
> >          // we cannot do request-reply over JMS with transaction
> >          answer.setSessionTransacted(false);
> > -
> > +
> >          // other optional properties
> > -        if (endpoint.getExceptionListener() != null) {
> > -
>  answer.setExceptionListener(endpoint.getExceptionListener());
> > -        }
> > +        answer.setExceptionListener(new
> TemporaryReplyQueueExceptionListener(destResolver,
> endpoint.getExceptionListener()));
> > +
> >          if (endpoint.getErrorHandler() != null) {
> >              answer.setErrorHandler(endpoint.getErrorHandler());
> >          } else {
> > @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager
> >              answer.setTaskExecutor(endpoint.getTaskExecutor());
> >          }
> >
> > -        // setup a bean name which is used ny Spring JMS as the thread
> name
> > -        String name = "TemporaryQueueReplyManager[" +
> answer.getDestinationName() + "]";
> > +        // setup a bean name which is used by Spring JMS as the thread
> name
> > +        // use the name of the request destination
> > +        String name = "TemporaryQueueReplyManager[" +
> endpoint.getDestinationName() + "]";
> >          answer.setBeanName(name);
> >
> >          if (answer.getConcurrentConsumers() > 1) {
> > @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager
> >          return answer;
> >      }
> >
> > +    private final class TemporaryReplyQueueExceptionListener implements
> ExceptionListener {
> > +        private final TemporaryReplyQueueDestinationResolver
> destResolver;
> > +        private final ExceptionListener delegate;
> > +
> > +        private
> TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver
> destResolver,
> > +                ExceptionListener delegate) {
> > +            this.destResolver = destResolver;
> > +            this.delegate = delegate;
> > +        }
> > +
> > +        @Override
> > +        public void onException(JMSException exception) {
> > +            // capture exceptions, and schedule a refresh of the
> ReplyTo destination
> > +            log.warn("Exception inside the DMLC for Temporary ReplyTo
> Queue for destination " + endpoint.getDestinationName() +
> > +                       ", refreshing ReplyTo destination", exception);
> > +            destResolver.scheduleRefresh();
> > +            // serve as a proxy for any exception listener the user may
> have set explicitly
> > +            if (delegate != null) {
> > +                delegate.onException(exception);
> > +            }
> > +        }
> > +
> > +    }
> > +
> > +    private final class TemporaryReplyQueueDestinationResolver
> implements DestinationResolver {
> > +        private TemporaryQueue queue;
> > +        private AtomicBoolean refreshWanted = new AtomicBoolean(false);
> > +
> > +        public Destination resolveDestinationName(Session session,
> String destinationName, boolean pubSubDomain)
> > +                throws JMSException {
> > +            // use a temporary queue to gather the reply message
> > +            synchronized (refreshWanted) {
> > +                if (queue == null || refreshWanted.compareAndSet(true,
> false)) {
> > +                    queue = session.createTemporaryQueue();
> > +                    setReplyTo(queue);
> > +                    log.debug("Refreshed Temporary ReplyTo Queue. New
> queue: " + queue.getQueueName());
> > +                    refreshWanted.notifyAll();
> > +                }
> > +            }
> > +            return queue;
> > +        }
> > +
> > +        public void scheduleRefresh() {
> > +            refreshWanted.set(true);
> > +        }
> > +
> > +        public void destinationReady() throws InterruptedException {
> > +            if (refreshWanted.get()) {
> > +                synchronized (refreshWanted) {
> > +                    log.debug("Waiting for new Temp ReplyTo destination
> to be assigned to continue");
> > +                    refreshWanted.wait();
> > +                }
> > +            }
> > +        }
> > +    }
> > +
> >  }
> >
> > Added:
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> (added)
> > +++
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> Wed Jan  9 23:49:26 2013
> > @@ -0,0 +1,143 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *      http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.camel.component.jms;
> > +
> > +import java.util.Map;
> > +import java.util.concurrent.Callable;
> > +import java.util.concurrent.ConcurrentHashMap;
> > +import java.util.concurrent.ExecutorService;
> > +import java.util.concurrent.Executors;
> > +import java.util.concurrent.atomic.AtomicInteger;
> > +
> > +import javax.jms.ConnectionFactory;
> > +
> > +import org.apache.activemq.ActiveMQConnectionFactory;
> > +import org.apache.activemq.broker.BrokerService;
> > +import org.apache.activemq.broker.TransportConnection;
> > +import org.apache.activemq.pool.PooledConnectionFactory;
> > +import org.apache.camel.CamelContext;
> > +import org.apache.camel.Exchange;
> > +import org.apache.camel.Processor;
> > +import org.apache.camel.builder.RouteBuilder;
> > +import org.apache.camel.test.junit4.CamelTestSupport;
> > +import org.junit.Test;
> > +
> > +import static
> org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> > +
> > +/**
> > + * Reliability tests for JMS TempQueue Reply Manager with multiple
> consumers.
> > + * @version
> > + */
> > +public class JmsRequestReplyTempQueueMultipleConsumersTest extends
> CamelTestSupport {
> > +
> > +    private Map<String, AtomicInteger> msgsPerThread = new
> ConcurrentHashMap<String, AtomicInteger>();
> > +    private BrokerService broker;
> > +    private PooledConnectionFactory connectionFactory;
> > +
> > +    @Test
> > +    public void testMultipleConsumingThreads() throws Exception {
> > +        doSendMessages(1000, 5);
> > +        assertTrue("Expected multiple consuming threads, but only
> found: " +  msgsPerThread.keySet().size(),
> > +                msgsPerThread.keySet().size() > 1);
> > +    }
> > +
> > +    @Test
> > +    public void testTempQueueRefreshed() throws Exception {
> > +        doSendMessages(500, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +    }
> > +
> > +    private void doSendMessages(int files, int poolSize) throws
> Exception {
> > +        getMockEndpoint("mock:result").expectedMessageCount(files);
> > +        getMockEndpoint("mock:result").expectsNoDuplicates(body());
> > +
> > +        ExecutorService executor =
> Executors.newFixedThreadPool(poolSize);
> > +        for (int i = 0; i < files; i++) {
> > +            final int index = i;
> > +            executor.submit(new Callable<Object>() {
> > +                public Object call() throws Exception {
> > +                    template.sendBody("seda:start", "Message " + index);
> > +                    return null;
> > +                }
> > +            });
> > +        }
> > +
> > +        assertMockEndpointsSatisfied();
> > +        resetMocks();
> > +        executor.shutdownNow();
> > +    }
> > +
> > +    public void startBroker() throws Exception {
> > +        String brokerName = "test-broker-" + System.currentTimeMillis();
> > +        String brokerUri = "vm://" + brokerName;
> > +        broker = new BrokerService();
> > +        broker.setBrokerName(brokerName);
> > +        broker.setBrokerId(brokerName);
> > +        broker.addConnector(brokerUri);
> > +        broker.setPersistent(false);
> > +        broker.start();
> > +    }
> > +
> > +    protected CamelContext createCamelContext() throws Exception {
> > +        CamelContext camelContext = super.createCamelContext();
> > +        //startBroker();
> > +
> > +        connectionFactory = (PooledConnectionFactory)
> CamelJmsTestHelper.createConnectionFactory();
> > +        camelContext.addComponent("jms",
> jmsComponentAutoAcknowledge(connectionFactory));
> > +
> > +        return camelContext;
> > +    }
> > +
> > +    @Override
> > +    protected RouteBuilder createRouteBuilder() throws Exception {
> > +        return new RouteBuilder() {
> > +            @Override
> > +            public void configure() throws Exception {
> > +                from("seda:start")
> > +
>  .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10")
> > +                    .process(new Processor() {
> > +                        @Override
> > +                        public void process(Exchange exchange) throws
> Exception {
> > +                            String threadName =
> Thread.currentThread().getName();
> > +                            synchronized (msgsPerThread) {
> > +                               AtomicInteger count =
> msgsPerThread.get(threadName);
> > +                               if (count == null) {
> > +                                   count = new AtomicInteger(0);
> > +                                   msgsPerThread.put(threadName, count);
> > +                               }
> > +                               count.incrementAndGet();
> > +                            }
> > +                        }
> > +                    })
> > +                    .to("mock:result");
> > +
> > +
>  from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10")
> > +                    .setBody(simple("Reply >>> ${body}"));
> > +            }
> > +        };
> > +    }
> > +
> > +}
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> Email: cibsen@redhat.com
> Web: http://fusesource.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>

Re: svn commit: r1431152 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I think we should honor the option replyToCacheLevelName, so people
can configure the cache level.
Now you hardcoded it to consumer.

Some brokers may not work well with that. So giving end users the
option to set the replyToCacheLevelName is better.
The default can still be cache consumer.



On Thu, Jan 10, 2013 at 12:49 AM,  <ra...@apache.org> wrote:
> Author: raulk
> Date: Wed Jan  9 23:49:26 2013
> New Revision: 1431152
>
> URL: http://svn.apache.org/viewvc?rev=1431152&view=rev
> Log:
> CAMEL-5865 Enhanced concurrent consumers support for JMS producers using Temp Reply Queue for replies
>
> Added:
>     camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> Modified:
>     camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
>
> Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
> +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Wed Jan  9 23:49:26 2013
> @@ -16,7 +16,10 @@
>   */
>  package org.apache.camel.component.jms.reply;
>
> +import java.util.concurrent.atomic.AtomicBoolean;
> +
>  import javax.jms.Destination;
> +import javax.jms.ExceptionListener;
>  import javax.jms.JMSException;
>  import javax.jms.Message;
>  import javax.jms.Session;
> @@ -37,11 +40,23 @@ import org.springframework.jms.support.d
>   * @version
>   */
>  public class TemporaryQueueReplyManager extends ReplyManagerSupport {
> -
> +
> +    final TemporaryReplyQueueDestinationResolver destResolver = new TemporaryReplyQueueDestinationResolver();
> +
>      public TemporaryQueueReplyManager(CamelContext camelContext) {
>          super(camelContext);
>      }
>
> +    @Override
> +    public Destination getReplyTo() {
> +        try {
> +            destResolver.destinationReady();
> +        } catch (InterruptedException e) {
> +            log.warn("Interrupted while waiting for JMSReplyTo destination refresh", e);
> +        }
> +        return super.getReplyTo();
> +    }
> +
>      public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
>                                  String originalCorrelationId, String correlationId, long requestTimeout) {
>          // add to correlation map
> @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager
>          DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint);
>
>          answer.setDestinationName("temporary");
> -        answer.setDestinationResolver(new DestinationResolver() {
> -            public Destination resolveDestinationName(Session session, String destinationName,
> -                                                      boolean pubSubDomain) throws JMSException {
> -                // use a temporary queue to gather the reply message
> -                TemporaryQueue queue = session.createTemporaryQueue();
> -                setReplyTo(queue);
> -                return queue;
> -            }
> -        });
> +        answer.setDestinationResolver(destResolver);
>          answer.setAutoStartup(true);
>          if (endpoint.getMaxMessagesPerTask() >= 0) {
>              answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask());
> @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager
>              answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
>          }
>          answer.setConnectionFactory(endpoint.getConnectionFactory());
> +        // we use CACHE_CONSUMER to cling to the consumer as long as we can, since we can only consume
> +        // msgs from the JMS Connection that created the temp destination in the first place
> +        answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
>          String clientId = endpoint.getClientId();
>          if (clientId != null) {
>              clientId += ".CamelReplyManager";
> @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager
>
>          // we cannot do request-reply over JMS with transaction
>          answer.setSessionTransacted(false);
> -
> +
>          // other optional properties
> -        if (endpoint.getExceptionListener() != null) {
> -            answer.setExceptionListener(endpoint.getExceptionListener());
> -        }
> +        answer.setExceptionListener(new TemporaryReplyQueueExceptionListener(destResolver, endpoint.getExceptionListener()));
> +
>          if (endpoint.getErrorHandler() != null) {
>              answer.setErrorHandler(endpoint.getErrorHandler());
>          } else {
> @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager
>              answer.setTaskExecutor(endpoint.getTaskExecutor());
>          }
>
> -        // setup a bean name which is used ny Spring JMS as the thread name
> -        String name = "TemporaryQueueReplyManager[" + answer.getDestinationName() + "]";
> +        // setup a bean name which is used by Spring JMS as the thread name
> +        // use the name of the request destination
> +        String name = "TemporaryQueueReplyManager[" + endpoint.getDestinationName() + "]";
>          answer.setBeanName(name);
>
>          if (answer.getConcurrentConsumers() > 1) {
> @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager
>          return answer;
>      }
>
> +    private final class TemporaryReplyQueueExceptionListener implements ExceptionListener {
> +        private final TemporaryReplyQueueDestinationResolver destResolver;
> +        private final ExceptionListener delegate;
> +
> +        private TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver destResolver,
> +                ExceptionListener delegate) {
> +            this.destResolver = destResolver;
> +            this.delegate = delegate;
> +        }
> +
> +        @Override
> +        public void onException(JMSException exception) {
> +            // capture exceptions, and schedule a refresh of the ReplyTo destination
> +            log.warn("Exception inside the DMLC for Temporary ReplyTo Queue for destination " + endpoint.getDestinationName() +
> +                       ", refreshing ReplyTo destination", exception);
> +            destResolver.scheduleRefresh();
> +            // serve as a proxy for any exception listener the user may have set explicitly
> +            if (delegate != null) {
> +                delegate.onException(exception);
> +            }
> +        }
> +
> +    }
> +
> +    private final class TemporaryReplyQueueDestinationResolver implements DestinationResolver {
> +        private TemporaryQueue queue;
> +        private AtomicBoolean refreshWanted = new AtomicBoolean(false);
> +
> +        public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
> +                throws JMSException {
> +            // use a temporary queue to gather the reply message
> +            synchronized (refreshWanted) {
> +                if (queue == null || refreshWanted.compareAndSet(true, false)) {
> +                    queue = session.createTemporaryQueue();
> +                    setReplyTo(queue);
> +                    log.debug("Refreshed Temporary ReplyTo Queue. New queue: " + queue.getQueueName());
> +                    refreshWanted.notifyAll();
> +                }
> +            }
> +            return queue;
> +        }
> +
> +        public void scheduleRefresh() {
> +            refreshWanted.set(true);
> +        }
> +
> +        public void destinationReady() throws InterruptedException {
> +            if (refreshWanted.get()) {
> +                synchronized (refreshWanted) {
> +                    log.debug("Waiting for new Temp ReplyTo destination to be assigned to continue");
> +                    refreshWanted.wait();
> +                }
> +            }
> +        }
> +    }
> +
>  }
>
> Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto
> ==============================================================================
> --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java (added)
> +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java Wed Jan  9 23:49:26 2013
> @@ -0,0 +1,143 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jms;
> +
> +import java.util.Map;
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +import java.util.concurrent.atomic.AtomicInteger;
> +
> +import javax.jms.ConnectionFactory;
> +
> +import org.apache.activemq.ActiveMQConnectionFactory;
> +import org.apache.activemq.broker.BrokerService;
> +import org.apache.activemq.broker.TransportConnection;
> +import org.apache.activemq.pool.PooledConnectionFactory;
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.Exchange;
> +import org.apache.camel.Processor;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.test.junit4.CamelTestSupport;
> +import org.junit.Test;
> +
> +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> +
> +/**
> + * Reliability tests for JMS TempQueue Reply Manager with multiple consumers.
> + * @version
> + */
> +public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupport {
> +
> +    private Map<String, AtomicInteger> msgsPerThread = new ConcurrentHashMap<String, AtomicInteger>();
> +    private BrokerService broker;
> +    private PooledConnectionFactory connectionFactory;
> +
> +    @Test
> +    public void testMultipleConsumingThreads() throws Exception {
> +        doSendMessages(1000, 5);
> +        assertTrue("Expected multiple consuming threads, but only found: " +  msgsPerThread.keySet().size(),
> +                msgsPerThread.keySet().size() > 1);
> +    }
> +
> +    @Test
> +    public void testTempQueueRefreshed() throws Exception {
> +        doSendMessages(500, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +        connectionFactory.clear();
> +        doSendMessages(100, 5);
> +    }
> +
> +    private void doSendMessages(int files, int poolSize) throws Exception {
> +        getMockEndpoint("mock:result").expectedMessageCount(files);
> +        getMockEndpoint("mock:result").expectsNoDuplicates(body());
> +
> +        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
> +        for (int i = 0; i < files; i++) {
> +            final int index = i;
> +            executor.submit(new Callable<Object>() {
> +                public Object call() throws Exception {
> +                    template.sendBody("seda:start", "Message " + index);
> +                    return null;
> +                }
> +            });
> +        }
> +
> +        assertMockEndpointsSatisfied();
> +        resetMocks();
> +        executor.shutdownNow();
> +    }
> +
> +    public void startBroker() throws Exception {
> +        String brokerName = "test-broker-" + System.currentTimeMillis();
> +        String brokerUri = "vm://" + brokerName;
> +        broker = new BrokerService();
> +        broker.setBrokerName(brokerName);
> +        broker.setBrokerId(brokerName);
> +        broker.addConnector(brokerUri);
> +        broker.setPersistent(false);
> +        broker.start();
> +    }
> +
> +    protected CamelContext createCamelContext() throws Exception {
> +        CamelContext camelContext = super.createCamelContext();
> +        //startBroker();
> +
> +        connectionFactory = (PooledConnectionFactory) CamelJmsTestHelper.createConnectionFactory();
> +        camelContext.addComponent("jms", jmsComponentAutoAcknowledge(connectionFactory));
> +
> +        return camelContext;
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("seda:start")
> +                    .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10")
> +                    .process(new Processor() {
> +                        @Override
> +                        public void process(Exchange exchange) throws Exception {
> +                            String threadName = Thread.currentThread().getName();
> +                            synchronized (msgsPerThread) {
> +                               AtomicInteger count = msgsPerThread.get(threadName);
> +                               if (count == null) {
> +                                   count = new AtomicInteger(0);
> +                                   msgsPerThread.put(threadName, count);
> +                               }
> +                               count.incrementAndGet();
> +                            }
> +                        }
> +                    })
> +                    .to("mock:result");
> +
> +                from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10")
> +                    .setBody(simple("Reply >>> ${body}"));
> +            }
> +        };
> +    }
> +
> +}
>
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen