You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2012/11/05 00:12:14 UTC

svn commit: r1405662 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/support/ components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/

Author: raulk
Date: Sun Nov  4 23:12:13 2012
New Revision: 1405662

URL: http://svn.apache.org/viewvc?rev=1405662&view=rev
Log:
Fixed CAMEL-5769: Camel JMS producer can block a thread under specific circumstances

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsBlockedAsyncRoutingEngineTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java?rev=1405662&r1=1405661&r2=1405662&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java Sun Nov  4 23:12:13 2012
@@ -165,6 +165,10 @@ public class DefaultTimeoutMap<K, V> ext
 
     public void purge() {
         log.trace("There are {} in the timeout map", map.size());
+        if (map.isEmpty()) {
+            return;
+        }
+        
         long now = currentTime();
 
         List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>();
@@ -200,7 +204,13 @@ public class DefaultTimeoutMap<K, V> ext
                 try {
                     // now fire eviction notification
                     for (TimeoutMapEntry<K, V> entry : expired) {
-                        boolean evict = onEviction(entry.getKey(), entry.getValue());
+                        boolean evict = false;
+                        try {
+                            evict = onEviction(entry.getKey(), entry.getValue());
+                        } catch (Throwable t) {
+                            log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: ", 
+                                    entry.getValue(), t);
+                        }
                         if (evict) {
                             // okay this entry should be evicted
                             evicts.add(entry.getKey());

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1405662&r1=1405661&r2=1405662&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Sun Nov  4 23:12:13 2012
@@ -53,7 +53,9 @@ public class TemporaryQueueReplyManager 
         log.trace("Updated provisional correlationId [{}] to expected correlationId [{}]", correlationId, newCorrelationId);
 
         ReplyHandler handler = correlation.remove(correlationId);
-        correlation.put(newCorrelationId, handler, requestTimeout);
+        if (handler != null) {
+            correlation.put(newCorrelationId, handler, requestTimeout);
+        }
     }
 
     @Override

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsBlockedAsyncRoutingEngineTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsBlockedAsyncRoutingEngineTest.java?rev=1405662&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsBlockedAsyncRoutingEngineTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsBlockedAsyncRoutingEngineTest.java Sun Nov  4 23:12:13 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.issues;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * Tests CAMEL-5769.
+ * Camel JMS producer can block a thread under specific circumstances.
+ *
+ */
+public class JmsBlockedAsyncRoutingEngineTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsBlockedAsyncRoutingEngineTest.class);
+    private BrokerService broker;
+    private final CountDownLatch latch = new CountDownLatch(5);
+    private final Synchronization callback = new Synchronization() {
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.info(">>>> Callback onFailure");
+            latch.countDown();
+        }
+        
+        @Override
+        public void onComplete(Exchange exchange) {
+            LOG.info(">>>> Callback onComplete");
+            latch.countDown();
+        }
+    };
+    
+    public void startBroker() throws Exception {
+        String brokerName = "test-broker-" + System.currentTimeMillis();
+        String brokerUri = "vm://" + brokerName;
+        broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        broker.setBrokerId(brokerName);
+        broker.addConnector(brokerUri);
+        broker.setPersistent(false);
+        // This Broker Plugin simulates Producer Flow Control by delaying the broker's ACK by 2 seconds
+        broker.setPlugins(new BrokerPlugin[] {new DelayerBrokerPlugin()});
+        broker.start();
+    }
+    
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        startBroker();
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+        return camelContext;
+    }
+    
+    @Test
+    public void testBlockedAsyncRoutingEngineTest() throws Exception {
+        // 0. This message takes 2000ms to ACK from the broker due to the DelayerBrokerPlugin
+        // Until then, the correlation ID doesn't get updated locally
+        try {
+            template.asyncRequestBody("activemq:queue:test?requestTimeout=500&useMessageIDAsCorrelationID=true", "hello");
+        } catch (Exception e) { }
+        
+        // 1. We wait a bit for the CorrelationTimeoutMap purge process to run
+        Thread.sleep(3000);
+        
+        // 2. We send 5 messages that take 2 seconds so that they time out
+        template.asyncCallbackRequestBody("activemq:queue:test?requestTimeout=500&useMessageIDAsCorrelationID=true", "beSlow", callback);
+        template.asyncCallbackRequestBody("activemq:queue:test?requestTimeout=500&useMessageIDAsCorrelationID=true", "beSlow", callback);
+        template.asyncCallbackRequestBody("activemq:queue:test?requestTimeout=500&useMessageIDAsCorrelationID=true", "beSlow", callback);
+        template.asyncCallbackRequestBody("activemq:queue:test?requestTimeout=500&useMessageIDAsCorrelationID=true", "beSlow", callback);
+        template.asyncCallbackRequestBody("activemq:queue:test?requestTimeout=500&useMessageIDAsCorrelationID=true", "beSlow", callback);
+
+        // 3. We assert that we were notified of all timeout exceptions
+        assertTrue(latch.await(3000, TimeUnit.MILLISECONDS));
+    }
+    
+    @After
+    public void cleanup() {
+        LOG.info(">>>>> Latch countdown count was: " + latch.getCount());
+    }
+    
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("activemq:queue:test?concurrentConsumers=5&useMessageIDAsCorrelationID=true&transacted=true")
+                    .filter().simple("${in.body} = 'beSlow'")
+                        .delay(constant(2000))
+                    .log(">>>>> Received message on test queue")
+                    .setBody(constant("Reply"))
+                    .log(">>>>> Sending back reply");
+                
+            }
+        };
+    }
+    
+    @SuppressWarnings("unchecked")
+    private class DelayerBrokerPlugin extends BrokerPluginSupport {
+        int i;
+
+        @Override
+        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+            String destinationName = messageSend.getDestination().getPhysicalName();
+            LOG.info("******** Received message for destination " + destinationName);
+            
+            // do not intercept sends to DLQ
+            if (destinationName.toLowerCase().contains("test") && i == 0) {
+                Thread.sleep(2000);
+                LOG.info("******** Waited 2 seconds for destination: " + destinationName);
+                i++;
+            }
+
+            super.send(producerExchange, messageSend);
+        }
+
+    }
+    
+}