You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/26 14:36:58 UTC

[camel] branch main updated: [Camel-6097] Fixes a race condition in AggregateProcessor (camel-core-processor) (#6830)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 314c8b4  [Camel-6097] Fixes a race condition in AggregateProcessor (camel-core-processor) (#6830)
314c8b4 is described below

commit 314c8b4b30dfd3e31bf52fb5d9fb7ff50d74d66a
Author: Benjamin BONNET <be...@m4x.org>
AuthorDate: Wed Jan 26 15:36:14 2022 +0100

    [Camel-6097] Fixes a race condition in AggregateProcessor (camel-core-processor) (#6830)
    
    * CAMEL-4271 : recover task in cluster
    
    * Specializing JdbcAggregationRepository for cluster
    
    * CAMEL-6097 : race condition in recovery/aggregate
    
    * Move cleanup code into a finally block
---
 .../processor/aggregate/AggregateProcessor.java    | 176 +++++++++++----------
 1 file changed, 90 insertions(+), 86 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 9b44c6f..d579ae6 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -17,8 +17,6 @@
 package org.apache.camel.processor.aggregate;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -101,7 +99,6 @@ public class AggregateProcessor extends AsyncProcessorSupport
     public static final String COMPLETED_BY_FORCE = "force";
 
     private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
-
     private volatile Lock lock;
     private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean();
     private final CamelContext camelContext;
@@ -127,7 +124,8 @@ public class AggregateProcessor extends AsyncProcessorSupport
     private AggregationRepository aggregationRepository;
     private Map<String, String> closedCorrelationKeys;
     private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<>();
-    private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Set<String> inProgressCompleteExchanges = ConcurrentHashMap.newKeySet();
+    private final Set<String> inProgressCompleteExchangesForRecoveryTask = ConcurrentHashMap.newKeySet();
     private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<>();
 
     private final AggregateProcessorStatistics statistics = new Statistics();
@@ -141,6 +139,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
     private final AtomicLong completedByBatchConsumer = new AtomicLong();
     private final AtomicLong completedByForce = new AtomicLong();
     private final AtomicLong discarded = new AtomicLong();
+    private final AtomicBoolean recoveryInProgress = new AtomicBoolean(false);
 
     // keep booking about redelivery
     private static class RedeliveryData {
@@ -327,7 +326,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
             totalIn.incrementAndGet();
         }
 
-        //check for the special header to force completion of all groups (and ignore the exchange otherwise)
+        // check for the special header to force completion of all groups (and ignore the exchange otherwise)
         if (isCompleteAllGroups(exchange)) {
             removeFlagCompleteAllGroups(exchange);
             forceCompletionOfAllGroups();
@@ -834,7 +833,9 @@ public class AggregateProcessor extends AsyncProcessorSupport
 
         // add this as in progress before we submit the task
         inProgressCompleteExchanges.add(exchange.getExchangeId());
-
+        if (recoveryInProgress.get()) {
+            inProgressCompleteExchangesForRecoveryTask.add(exchange.getExchangeId());
+        }
         // invoke the on completion callback
         aggregationStrategy.onCompletion(exchange);
 
@@ -877,6 +878,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
 
         // send this exchange
         executorService.execute(() -> {
+
             Runnable task = () -> processor.process(exchange, done -> {
                 // log exception if there was a problem
                 if (exchange.getException() != null) {
@@ -1365,98 +1367,100 @@ public class AggregateProcessor extends AsyncProcessorSupport
                 LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName());
                 return;
             }
-
             LOG.trace("Starting recover check");
+            try {
+                // copy the current in progress before doing scan
+                recoveryInProgress.set(true);
+                inProgressCompleteExchangesForRecoveryTask.clear();
+                inProgressCompleteExchangesForRecoveryTask.addAll(inProgressCompleteExchanges);
+                final Set<String> exchangeIds = recoverable.scan(camelContext);
+                for (String exchangeId : exchangeIds) {
+
+                    // we may shutdown while doing recovery
+                    if (!isRunAllowed()) {
+                        LOG.info("We are shutting down so stop recovering");
+                        return;
+                    }
+                    lock.lock();
+                    try {
+                        // consider in progress if it was in progress before we did the scan, or currently after we did the scan
+                        // its safer to consider it in progress than risk duplicates due both in progress + recovered
+                        final boolean inProgress = inProgressCompleteExchangesForRecoveryTask.contains(exchangeId);
+                        if (inProgress) {
+                            LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
+                        } else {
+                            LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
+                            Exchange exchange = recoverable.recover(camelContext, exchangeId);
+                            if (exchange != null) {
+                                // get the correlation key
+                                String key = exchange.getProperty(ExchangePropertyKey.AGGREGATED_CORRELATION_KEY, String.class);
+                                // and mark it as redelivered
+                                exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+
+                                // get the current redelivery data
+                                RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
+
+                                // if we are exhausted, then move to dead letter channel
+                                if (data != null && recoverable.getMaximumRedeliveries() > 0
+                                        && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
+                                    LOG.warn("The recovered exchange is exhausted after {} attempts, will now be moved to "
+                                             + "dead letter channel: {}",
+                                            recoverable.getMaximumRedeliveries(), recoverable.getDeadLetterUri());
+
+                                    // send to DLC
+                                    try {
+                                        // set redelivery counter
+                                        exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+                                        // and prepare for sending to DLC
+                                        exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+                                        exchange.adapt(ExtendedExchange.class).setRollbackOnly(false);
+                                        deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
+                                    } catch (Throwable e) {
+                                        exchange.setException(e);
+                                    }
+
+                                    // handle if failed
+                                    if (exchange.getException() != null) {
+                                        getExceptionHandler()
+                                                .handleException("Failed to move recovered Exchange to dead letter channel: "
+                                                                 + recoverable.getDeadLetterUri(),
+                                                        exchange.getException());
+                                    } else {
+                                        // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
+                                        recoverable.confirm(camelContext, exchangeId);
+                                    }
+                                } else {
+                                    // update current redelivery state
+                                    if (data == null) {
+                                        // create new data
+                                        data = new RedeliveryData();
+                                        redeliveryState.put(exchange.getExchangeId(), data);
+                                    }
+                                    data.redeliveryCounter++;
 
-            // copy the current in progress before doing scan
-            final Set<String> copyOfInProgress = new LinkedHashSet<>(inProgressCompleteExchanges);
-
-            Set<String> exchangeIds = recoverable.scan(camelContext);
-            for (String exchangeId : exchangeIds) {
-
-                // we may shutdown while doing recovery
-                if (!isRunAllowed()) {
-                    LOG.info("We are shutting down so stop recovering");
-                    return;
-                }
-                lock.lock();
-                try {
-                    // consider in progress if it was in progress before we did the scan, or currently after we did the scan
-                    // its safer to consider it in progress than risk duplicates due both in progress + recovered
-                    boolean inProgress
-                            = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
-                    if (inProgress) {
-                        LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
-                    } else {
-                        LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
-                        Exchange exchange = recoverable.recover(camelContext, exchangeId);
-                        if (exchange != null) {
-                            // get the correlation key
-                            String key = exchange.getProperty(ExchangePropertyKey.AGGREGATED_CORRELATION_KEY, String.class);
-                            // and mark it as redelivered
-                            exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
-
-                            // get the current redelivery data
-                            RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
-
-                            // if we are exhausted, then move to dead letter channel
-                            if (data != null && recoverable.getMaximumRedeliveries() > 0
-                                    && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
-                                LOG.warn("The recovered exchange is exhausted after {} attempts, will now be moved to "
-                                         + "dead letter channel: {}",
-                                        recoverable.getMaximumRedeliveries(), recoverable.getDeadLetterUri());
-
-                                // send to DLC
-                                try {
                                     // set redelivery counter
                                     exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
-                                    // and prepare for sending to DLC
-                                    exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
-                                    exchange.adapt(ExtendedExchange.class).setRollbackOnly(false);
-                                    deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
-                                } catch (Throwable e) {
-                                    exchange.setException(e);
-                                }
+                                    if (recoverable.getMaximumRedeliveries() > 0) {
+                                        exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER,
+                                                recoverable.getMaximumRedeliveries());
+                                    }
 
-                                // handle if failed
-                                if (exchange.getException() != null) {
-                                    getExceptionHandler()
-                                            .handleException("Failed to move recovered Exchange to dead letter channel: "
-                                                             + recoverable.getDeadLetterUri(),
-                                                    exchange.getException());
-                                } else {
-                                    // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
-                                    recoverable.confirm(camelContext, exchangeId);
-                                }
-                            } else {
-                                // update current redelivery state
-                                if (data == null) {
-                                    // create new data
-                                    data = new RedeliveryData();
-                                    redeliveryState.put(exchange.getExchangeId(), data);
-                                }
-                                data.redeliveryCounter++;
+                                    LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}",
+                                            data.redeliveryCounter, exchangeId);
 
-                                // set redelivery counter
-                                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
-                                if (recoverable.getMaximumRedeliveries() > 0) {
-                                    exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER,
-                                            recoverable.getMaximumRedeliveries());
+                                    // not exhaust so resubmit the recovered exchange
+                                    onSubmitCompletion(key, exchange);
                                 }
-
-                                LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}",
-                                        data.redeliveryCounter, exchangeId);
-
-                                // not exhaust so resubmit the recovered exchange
-                                onSubmitCompletion(key, exchange);
                             }
                         }
+                    } finally {
+                        lock.unlock();
                     }
-                } finally {
-                    lock.unlock();
                 }
+            } finally {
+                recoveryInProgress.set(false);
+                inProgressCompleteExchangesForRecoveryTask.clear();
             }
-
             LOG.trace("Recover check complete");
         }
     }