You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/29 11:15:08 UTC
svn commit: r939238 -
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Author: davsclaus
Date: Thu Apr 29 09:15:08 2010
New Revision: 939238
URL: http://svn.apache.org/viewvc?rev=939238&view=rev
Log:
Using fair lock in aggregator to give timeout checker a chance.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=939238&r1=939237&r2=939238&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Apr 29 09:15:08 2010
@@ -75,7 +75,9 @@ public class AggregateProcessor extends
private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
- private final Lock lock = new ReentrantLock();
+ // use a fair lock so timeout checker will have a chance to acquire the lock if
+ // a lot of new messages keep arriving
+ private final Lock lock = new ReentrantLock(true);
private final CamelContext camelContext;
private final Processor processor;
private final AggregationStrategy aggregationStrategy;
@@ -175,8 +177,8 @@ public class AggregateProcessor extends
// when memory based then its fast using synchronized, but if the aggregation repository is IO
// bound such as JPA etc then concurrent aggregation per correlation key could
// improve performance as we can run aggregation repository get/add in parallel
+ lock.lock();
try {
- lock.lock();
doAggregation(key, exchange);
} finally {
lock.unlock();
@@ -551,24 +553,23 @@ public class AggregateProcessor extends
log.debug("Completion timeout triggered for correlation key: " + key);
}
- try {
- lock.lock();
-
- // double check that its not already in progress
- boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
- if (inProgress) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Aggregated exchange with id: " + exchangeId + " is already in progress.");
- }
- return;
+ // double check that its not already in progress
+ boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
+ if (inProgress) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Aggregated exchange with id: " + exchangeId + " is already in progress.");
}
+ return;
+ }
- // get the aggregated exchange
- Exchange answer = aggregationRepository.get(camelContext, key);
+ // get the aggregated exchange
+ Exchange answer = aggregationRepository.get(camelContext, key);
- // indicate it was completed by timeout
- answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+ // indicate it was completed by timeout
+ answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+ lock.lock();
+ try {
onCompletion(key, answer, true);
} finally {
lock.unlock();
@@ -593,23 +594,25 @@ public class AggregateProcessor extends
LOG.trace("Starting completion interval task");
// trigger completion for all in the repository
- try {
- lock.lock();
+ Set<String> keys = aggregationRepository.getKeys();
- Set<String> keys = aggregationRepository.getKeys();
- for (String key : keys) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Completion interval triggered for correlation key: " + key);
- }
- Exchange exchange = aggregationRepository.get(camelContext, key);
+ if (keys != null && !keys.isEmpty()) {
+ lock.lock();
+ try {
+ for (String key : keys) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completion interval triggered for correlation key: " + key);
+ }
+ Exchange exchange = aggregationRepository.get(camelContext, key);
- // indicate it was completed by interval
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+ // indicate it was completed by interval
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
- onCompletion(key, exchange, false);
+ onCompletion(key, exchange, false);
+ }
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
}
LOG.trace("Completion interval task complete");
@@ -703,8 +706,8 @@ public class AggregateProcessor extends
LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + "");
}
// not exhaust so resubmit the recovered exchange
+ lock.lock();
try {
- lock.lock();
onSubmitCompletion(key, exchange);
} finally {
lock.unlock();