You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/18 07:35:44 UTC

svn commit: r955841 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Author: davsclaus
Date: Fri Jun 18 05:35:44 2010
New Revision: 955841

URL: http://svn.apache.org/viewvc?rev=955841&view=rev
Log:
CAMEL-2824: Fixed potential deadlock in aggregator when using timeout completion.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=955841&r1=955840&r2=955841&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Jun 18 05:35:44 2010
@@ -576,12 +576,12 @@ public class AggregateProcessor extends 
             // indicate it was completed by timeout
             answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
 
-            lock.lock();
-            try {
-                onCompletion(key, answer, true);
-            } finally {
-                lock.unlock();
-            }
+            // do not acquire locks as we already have a lock on the timeout map
+            // and we want to avoid a dead lock if another thread (currently aggregating)
+            // which wants to put into the timeout map as well (CAMEL-2824)
+            // and running the on completion logic can occur concurrently, its just the aggregation logic
+            // which is preferred to run non concurrent.
+            onCompletion(key, answer, true);
         }
     }
 
@@ -713,13 +713,9 @@ public class AggregateProcessor extends 
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");
                             }
+
                             // not exhaust so resubmit the recovered exchange
-                            lock.lock();
-                            try {
-                                onSubmitCompletion(key, exchange);
-                            } finally {
-                                lock.unlock();
-                            }
+                            onSubmitCompletion(key, exchange);
                         }
                     }
                 }