You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/26 14:36:58 UTC
[camel] branch main updated: [Camel-6097] Fixes a race condition in AggregateProcessor (camel-core-processor) (#6830)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 314c8b4 [Camel-6097] Fixes a race condition in AggregateProcessor (camel-core-processor) (#6830)
314c8b4 is described below
commit 314c8b4b30dfd3e31bf52fb5d9fb7ff50d74d66a
Author: Benjamin BONNET <be...@m4x.org>
AuthorDate: Wed Jan 26 15:36:14 2022 +0100
[Camel-6097] Fixes a race condition in AggregateProcessor (camel-core-processor) (#6830)
* CAMEL-4271 : recover task in cluster
* Specializing JdbcAggregationRepository for cluster
* CAMEL-6097 : race condition in recovery/aggregate
* Move cleanup code into a finally block
---
.../processor/aggregate/AggregateProcessor.java | 176 +++++++++++----------
1 file changed, 90 insertions(+), 86 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 9b44c6f..d579ae6 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -17,8 +17,6 @@
package org.apache.camel.processor.aggregate;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -101,7 +99,6 @@ public class AggregateProcessor extends AsyncProcessorSupport
public static final String COMPLETED_BY_FORCE = "force";
private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
-
private volatile Lock lock;
private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean();
private final CamelContext camelContext;
@@ -127,7 +124,8 @@ public class AggregateProcessor extends AsyncProcessorSupport
private AggregationRepository aggregationRepository;
private Map<String, String> closedCorrelationKeys;
private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<>();
- private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<String> inProgressCompleteExchanges = ConcurrentHashMap.newKeySet();
+ private final Set<String> inProgressCompleteExchangesForRecoveryTask = ConcurrentHashMap.newKeySet();
private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<>();
private final AggregateProcessorStatistics statistics = new Statistics();
@@ -141,6 +139,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
private final AtomicLong completedByBatchConsumer = new AtomicLong();
private final AtomicLong completedByForce = new AtomicLong();
private final AtomicLong discarded = new AtomicLong();
+ private final AtomicBoolean recoveryInProgress = new AtomicBoolean(false);
// keep booking about redelivery
private static class RedeliveryData {
@@ -327,7 +326,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
totalIn.incrementAndGet();
}
- //check for the special header to force completion of all groups (and ignore the exchange otherwise)
+ // check for the special header to force completion of all groups (and ignore the exchange otherwise)
if (isCompleteAllGroups(exchange)) {
removeFlagCompleteAllGroups(exchange);
forceCompletionOfAllGroups();
@@ -834,7 +833,9 @@ public class AggregateProcessor extends AsyncProcessorSupport
// add this as in progress before we submit the task
inProgressCompleteExchanges.add(exchange.getExchangeId());
-
+ if (recoveryInProgress.get()) {
+ inProgressCompleteExchangesForRecoveryTask.add(exchange.getExchangeId());
+ }
// invoke the on completion callback
aggregationStrategy.onCompletion(exchange);
@@ -877,6 +878,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
// send this exchange
executorService.execute(() -> {
+
Runnable task = () -> processor.process(exchange, done -> {
// log exception if there was a problem
if (exchange.getException() != null) {
@@ -1365,98 +1367,100 @@ public class AggregateProcessor extends AsyncProcessorSupport
LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName());
return;
}
-
LOG.trace("Starting recover check");
+ try {
+ // copy the current in progress before doing scan
+ recoveryInProgress.set(true);
+ inProgressCompleteExchangesForRecoveryTask.clear();
+ inProgressCompleteExchangesForRecoveryTask.addAll(inProgressCompleteExchanges);
+ final Set<String> exchangeIds = recoverable.scan(camelContext);
+ for (String exchangeId : exchangeIds) {
+
+ // we may shutdown while doing recovery
+ if (!isRunAllowed()) {
+ LOG.info("We are shutting down so stop recovering");
+ return;
+ }
+ lock.lock();
+ try {
+ // consider in progress if it was in progress before we did the scan, or currently after we did the scan
+ // its safer to consider it in progress than risk duplicates due both in progress + recovered
+ final boolean inProgress = inProgressCompleteExchangesForRecoveryTask.contains(exchangeId);
+ if (inProgress) {
+ LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
+ } else {
+ LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
+ Exchange exchange = recoverable.recover(camelContext, exchangeId);
+ if (exchange != null) {
+ // get the correlation key
+ String key = exchange.getProperty(ExchangePropertyKey.AGGREGATED_CORRELATION_KEY, String.class);
+ // and mark it as redelivered
+ exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+
+ // get the current redelivery data
+ RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
+
+ // if we are exhausted, then move to dead letter channel
+ if (data != null && recoverable.getMaximumRedeliveries() > 0
+ && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
+ LOG.warn("The recovered exchange is exhausted after {} attempts, will now be moved to "
+ + "dead letter channel: {}",
+ recoverable.getMaximumRedeliveries(), recoverable.getDeadLetterUri());
+
+ // send to DLC
+ try {
+ // set redelivery counter
+ exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+ // and prepare for sending to DLC
+ exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+ exchange.adapt(ExtendedExchange.class).setRollbackOnly(false);
+ deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ // handle if failed
+ if (exchange.getException() != null) {
+ getExceptionHandler()
+ .handleException("Failed to move recovered Exchange to dead letter channel: "
+ + recoverable.getDeadLetterUri(),
+ exchange.getException());
+ } else {
+ // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
+ recoverable.confirm(camelContext, exchangeId);
+ }
+ } else {
+ // update current redelivery state
+ if (data == null) {
+ // create new data
+ data = new RedeliveryData();
+ redeliveryState.put(exchange.getExchangeId(), data);
+ }
+ data.redeliveryCounter++;
- // copy the current in progress before doing scan
- final Set<String> copyOfInProgress = new LinkedHashSet<>(inProgressCompleteExchanges);
-
- Set<String> exchangeIds = recoverable.scan(camelContext);
- for (String exchangeId : exchangeIds) {
-
- // we may shutdown while doing recovery
- if (!isRunAllowed()) {
- LOG.info("We are shutting down so stop recovering");
- return;
- }
- lock.lock();
- try {
- // consider in progress if it was in progress before we did the scan, or currently after we did the scan
- // its safer to consider it in progress than risk duplicates due both in progress + recovered
- boolean inProgress
- = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
- if (inProgress) {
- LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
- } else {
- LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
- Exchange exchange = recoverable.recover(camelContext, exchangeId);
- if (exchange != null) {
- // get the correlation key
- String key = exchange.getProperty(ExchangePropertyKey.AGGREGATED_CORRELATION_KEY, String.class);
- // and mark it as redelivered
- exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
-
- // get the current redelivery data
- RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
-
- // if we are exhausted, then move to dead letter channel
- if (data != null && recoverable.getMaximumRedeliveries() > 0
- && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
- LOG.warn("The recovered exchange is exhausted after {} attempts, will now be moved to "
- + "dead letter channel: {}",
- recoverable.getMaximumRedeliveries(), recoverable.getDeadLetterUri());
-
- // send to DLC
- try {
// set redelivery counter
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
- // and prepare for sending to DLC
- exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
- exchange.adapt(ExtendedExchange.class).setRollbackOnly(false);
- deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- }
+ if (recoverable.getMaximumRedeliveries() > 0) {
+ exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER,
+ recoverable.getMaximumRedeliveries());
+ }
- // handle if failed
- if (exchange.getException() != null) {
- getExceptionHandler()
- .handleException("Failed to move recovered Exchange to dead letter channel: "
- + recoverable.getDeadLetterUri(),
- exchange.getException());
- } else {
- // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
- recoverable.confirm(camelContext, exchangeId);
- }
- } else {
- // update current redelivery state
- if (data == null) {
- // create new data
- data = new RedeliveryData();
- redeliveryState.put(exchange.getExchangeId(), data);
- }
- data.redeliveryCounter++;
+ LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}",
+ data.redeliveryCounter, exchangeId);
- // set redelivery counter
- exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
- if (recoverable.getMaximumRedeliveries() > 0) {
- exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER,
- recoverable.getMaximumRedeliveries());
+ // not exhaust so resubmit the recovered exchange
+ onSubmitCompletion(key, exchange);
}
-
- LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}",
- data.redeliveryCounter, exchangeId);
-
- // not exhaust so resubmit the recovered exchange
- onSubmitCompletion(key, exchange);
}
}
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
}
+ } finally {
+ recoveryInProgress.set(false);
+ inProgressCompleteExchangesForRecoveryTask.clear();
}
-
LOG.trace("Recover check complete");
}
}