You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/29 11:15:08 UTC

svn commit: r939238 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Author: davsclaus
Date: Thu Apr 29 09:15:08 2010
New Revision: 939238

URL: http://svn.apache.org/viewvc?rev=939238&view=rev
Log:
Using fair lock in aggregator to give timeout checker a chance.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=939238&r1=939237&r2=939238&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Apr 29 09:15:08 2010
@@ -75,7 +75,9 @@ public class AggregateProcessor extends 
 
     private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
 
-    private final Lock lock = new ReentrantLock();
+    // use a fair lock so timeout checker will have a chance to acquire the lock if
+    // a lot of new messages keep arriving
+    private final Lock lock = new ReentrantLock(true);
     private final CamelContext camelContext;
     private final Processor processor;
     private final AggregationStrategy aggregationStrategy;
@@ -175,8 +177,8 @@ public class AggregateProcessor extends 
         // when memory based then its fast using synchronized, but if the aggregation repository is IO
         // bound such as JPA etc then concurrent aggregation per correlation key could
         // improve performance as we can run aggregation repository get/add in parallel
+        lock.lock();
         try {
-            lock.lock();
             doAggregation(key, exchange);
         } finally {
             lock.unlock();
@@ -551,24 +553,23 @@ public class AggregateProcessor extends 
                 log.debug("Completion timeout triggered for correlation key: " + key);
             }
 
-            try {
-                lock.lock();
-
-                // double check that its not already in progress
-                boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
-                if (inProgress) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Aggregated exchange with id: " + exchangeId + " is already in progress.");
-                    }
-                    return;
+            // double check that its not already in progress
+            boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
+            if (inProgress) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Aggregated exchange with id: " + exchangeId + " is already in progress.");
                 }
+                return;
+            }
 
-                // get the aggregated exchange
-                Exchange answer = aggregationRepository.get(camelContext, key);
+            // get the aggregated exchange
+            Exchange answer = aggregationRepository.get(camelContext, key);
 
-                // indicate it was completed by timeout
-                answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+            // indicate it was completed by timeout
+            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
 
+            lock.lock();
+            try {
                 onCompletion(key, answer, true);
             } finally {
                 lock.unlock();
@@ -593,23 +594,25 @@ public class AggregateProcessor extends 
             LOG.trace("Starting completion interval task");
 
             // trigger completion for all in the repository
-            try {
-                lock.lock();
+            Set<String> keys = aggregationRepository.getKeys();
 
-                Set<String> keys = aggregationRepository.getKeys();
-                for (String key : keys) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Completion interval triggered for correlation key: " + key);
-                    }
-                    Exchange exchange = aggregationRepository.get(camelContext, key);
+            if (keys != null && !keys.isEmpty()) {
+                lock.lock();
+                try {
+                    for (String key : keys) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Completion interval triggered for correlation key: " + key);
+                        }
+                        Exchange exchange = aggregationRepository.get(camelContext, key);
 
-                    // indicate it was completed by interval
-                    exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+                        // indicate it was completed by interval
+                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
 
-                    onCompletion(key, exchange, false);
+                        onCompletion(key, exchange, false);
+                    }
+                } finally {
+                    lock.unlock();
                 }
-            } finally {
-                lock.unlock();
             }
 
             LOG.trace("Completion interval task complete");
@@ -703,8 +706,8 @@ public class AggregateProcessor extends 
                                 LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");
                             }
                             // not exhaust so resubmit the recovered exchange
+                            lock.lock();
                             try {
-                                lock.lock();
                                 onSubmitCompletion(key, exchange);
                             } finally {
                                 lock.unlock();