You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/08/24 09:36:23 UTC
[2/7] camel git commit: CAMEL-8010:Locking the critical section to
avoid race condition if AggregateTimeOutChecker also completes at the same
time as Recover task
CAMEL-8010:Locking the critical section to avoid race condition if AggregateTimeOutChecker also completes at the same time as Recover task
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/655c771c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/655c771c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/655c771c
Branch: refs/heads/master
Commit: 655c771c330f2ce404b4515d3c649da7b8a22a35
Parents: 8163a8f
Author: Rajithamol <rl...@mediaocean.com>
Authored: Wed Aug 16 14:10:30 2017 -0400
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Aug 24 11:18:46 2017 +0200
----------------------------------------------------------------------
.../processor/aggregate/AggregateProcessor.java | 113 ++++++++++---------
1 file changed, 61 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/655c771c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 50978a0..6d2c5a3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1223,68 +1223,77 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
LOG.info("We are shutting down so stop recovering");
return;
}
+ if(!optimisticLocking){
+ lock.lock();
+ }
+ try {
+ // consider in progress if it was in progress before we did the scan, or currently after we did the scan
+ // its safer to consider it in progress than risk duplicates due both in progress + recovered
+ boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
+ if (inProgress) {
+ LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
+ } else {
+ LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
+ Exchange exchange = recoverable.recover(camelContext, exchangeId);
+ if (exchange != null) {
+ // get the correlation key
+ String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
+ // and mark it as redelivered
+ exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+
+ // get the current redelivery data
+ RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
+
+ // if we are exhausted, then move to dead letter channel
+ if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
+ LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
+ + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
+
+ // send to DLC
+ try {
+ // set redelivery counter
+ exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+ exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+ deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
- // consider in progress if it was in progress before we did the scan, or currently after we did the scan
- // its safer to consider it in progress than risk duplicates due both in progress + recovered
- boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
- if (inProgress) {
- LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
- } else {
- LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
- Exchange exchange = recoverable.recover(camelContext, exchangeId);
- if (exchange != null) {
- // get the correlation key
- String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
- // and mark it as redelivered
- exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
-
- // get the current redelivery data
- RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
-
- // if we are exhausted, then move to dead letter channel
- if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
- LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
- + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
+ // handle if failed
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
+ } else {
+ // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
+ recoverable.confirm(camelContext, exchangeId);
+ }
+ } else {
+ // update current redelivery state
+ if (data == null) {
+ // create new data
+ data = new RedeliveryData();
+ redeliveryState.put(exchange.getExchangeId(), data);
+ }
+ data.redeliveryCounter++;
- // send to DLC
- try {
// set redelivery counter
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
- exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
- deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- }
+ if (recoverable.getMaximumRedeliveries() > 0) {
+ exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
+ }
- // handle if failed
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
- } else {
- // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
- recoverable.confirm(camelContext, exchangeId);
- }
- } else {
- // update current redelivery state
- if (data == null) {
- // create new data
- data = new RedeliveryData();
- redeliveryState.put(exchange.getExchangeId(), data);
- }
- data.redeliveryCounter++;
+ LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
- // set redelivery counter
- exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
- if (recoverable.getMaximumRedeliveries() > 0) {
- exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
+ // not exhaust so resubmit the recovered exchange
+ onSubmitCompletion(key, exchange);
}
-
- LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
-
- // not exhaust so resubmit the recovered exchange
- onSubmitCompletion(key, exchange);
}
}
}
+ finally {
+ if(!optimisticLocking){
+ lock.unlock();
+ }
+ }
}
LOG.trace("Recover check complete");