You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/08/24 09:36:23 UTC

[2/7] camel git commit: CAMEL-8010:Locking the critical section to avoid race condition if AggregateTimeOutChecker also completes at the same time as Recover task

CAMEL-8010:Locking the critical section to avoid race condition if AggregateTimeOutChecker also completes at the same time as Recover task


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/655c771c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/655c771c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/655c771c

Branch: refs/heads/master
Commit: 655c771c330f2ce404b4515d3c649da7b8a22a35
Parents: 8163a8f
Author: Rajithamol <rl...@mediaocean.com>
Authored: Wed Aug 16 14:10:30 2017 -0400
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Aug 24 11:18:46 2017 +0200

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java | 113 ++++++++++---------
 1 file changed, 61 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/655c771c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 50978a0..6d2c5a3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1223,68 +1223,77 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                     LOG.info("We are shutting down so stop recovering");
                     return;
                 }
+                if(!optimisticLocking){
+                    lock.lock();
+                }
+                try {
+                    // consider in progress if it was in progress before we did the scan, or currently after we did the scan
+                    // its safer to consider it in progress than risk duplicates due both in progress + recovered
+                    boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
+                    if (inProgress) {
+                        LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
+                    } else {
+                        LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
+                        Exchange exchange = recoverable.recover(camelContext, exchangeId);
+                        if (exchange != null) {
+                            // get the correlation key
+                            String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
+                            // and mark it as redelivered
+                            exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+
+                            // get the current redelivery data
+                            RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
+
+                            // if we are exhausted, then move to dead letter channel
+                            if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
+                                LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
+                                        + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
+
+                                // send to DLC
+                                try {
+                                    // set redelivery counter
+                                    exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+                                    exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+                                    deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
+                                } catch (Throwable e) {
+                                    exchange.setException(e);
+                                }
 
-                // consider in progress if it was in progress before we did the scan, or currently after we did the scan
-                // its safer to consider it in progress than risk duplicates due both in progress + recovered
-                boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
-                if (inProgress) {
-                    LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
-                } else {
-                    LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
-                    Exchange exchange = recoverable.recover(camelContext, exchangeId);
-                    if (exchange != null) {
-                        // get the correlation key
-                        String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
-                        // and mark it as redelivered
-                        exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
-
-                        // get the current redelivery data
-                        RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
-
-                        // if we are exhausted, then move to dead letter channel
-                        if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
-                            LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
-                                    + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
+                                // handle if failed
+                                if (exchange.getException() != null) {
+                                    getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
+                                } else {
+                                    // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
+                                    recoverable.confirm(camelContext, exchangeId);
+                                }
+                            } else {
+                                // update current redelivery state
+                                if (data == null) {
+                                    // create new data
+                                    data = new RedeliveryData();
+                                    redeliveryState.put(exchange.getExchangeId(), data);
+                                }
+                                data.redeliveryCounter++;
 
-                            // send to DLC
-                            try {
                                 // set redelivery counter
                                 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
-                                exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
-                                deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
-                            } catch (Throwable e) {
-                                exchange.setException(e);
-                            }
+                                if (recoverable.getMaximumRedeliveries() > 0) {
+                                    exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
+                                }
 
-                            // handle if failed
-                            if (exchange.getException() != null) {
-                                getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
-                            } else {
-                                // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
-                                recoverable.confirm(camelContext, exchangeId);
-                            }
-                        } else {
-                            // update current redelivery state
-                            if (data == null) {
-                                // create new data
-                                data = new RedeliveryData();
-                                redeliveryState.put(exchange.getExchangeId(), data);
-                            }
-                            data.redeliveryCounter++;
+                                LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
 
-                            // set redelivery counter
-                            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
-                            if (recoverable.getMaximumRedeliveries() > 0) {
-                                exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
+                                // not exhaust so resubmit the recovered exchange
+                                onSubmitCompletion(key, exchange);
                             }
-
-                            LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
-
-                            // not exhaust so resubmit the recovered exchange
-                            onSubmitCompletion(key, exchange);
                         }
                     }
                 }
+                finally {
+                    if(!optimisticLocking){
+                        lock.unlock();
+                    }
+                }
             }
 
             LOG.trace("Recover check complete");