You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/05 12:19:00 UTC
[camel] 02/03: CAMEL-6325: Add discardOnAggregationFailure option
to aggregate EIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit ab8a08e8717c860cc2bd6f6b8d4d716d3ee782dc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Aug 5 14:10:08 2019 +0200
CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP
---
.../processor/aggregate/AggregateProcessor.java | 58 +++++++--
.../apache/camel/model/AggregateDefinition.java | 27 +++-
.../org/apache/camel/reifier/AggregateReifier.java | 8 ++
.../aggregator/AggregateDiscardOnFailureTest.java | 140 +++++++++++++++++++++
4 files changed, 219 insertions(+), 14 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index efb0b2a..c7dbb09 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -229,6 +229,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
private boolean completionOnNewCorrelationGroup;
private AtomicInteger batchConsumerCounter = new AtomicInteger();
private boolean discardOnCompletionTimeout;
+ private boolean discardOnAggregationFailure;
private boolean forceCompletionOnStop;
private boolean completeAllOnStop;
private long completionTimeoutCheckerInterval = 1000;
@@ -477,7 +478,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
if (preCompletion && complete != null) {
// need to pre complete the current group before we aggregate
- doAggregationComplete(complete, list, key, originalExchange, oldExchange);
+ doAggregationComplete(complete, list, key, originalExchange, oldExchange, false);
// as we complete the current group eager, we should indicate the new group is not complete
complete = null;
// and clear old/original exchange as we start on a new group
@@ -490,11 +491,24 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
}
// aggregate the exchanges
+ boolean aggregateFailed = false;
try {
answer = onAggregation(oldExchange, newExchange);
} catch (Throwable e) {
- // must catch any exception from aggregation
- throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
+ aggregateFailed = true;
+ if (isDiscardOnAggregationFailure()) {
+ // discard due failure in aggregation strategy
+ log.debug("Aggregation for correlation key {} discarding aggregated exchange: {} due to failure in AggregationStrategy caused by: {}", key, oldExchange, e.getMessage());
+ complete = COMPLETED_BY_STRATEGY;
+ answer = oldExchange;
+ if (answer == null) {
+ // first message in group failed during aggregation and we should just discard this
+ return null;
+ }
+ } else {
+ // must catch any exception from aggregation
+ throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
+ }
}
if (answer == null) {
throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
@@ -529,19 +543,20 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
}
}
- if (complete == null) {
+ if (!aggregateFailed && complete == null) {
// only need to update aggregation repository if we are not complete
doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
} else {
// if we are complete then add the answer to the list
- doAggregationComplete(complete, list, key, originalExchange, answer);
+ doAggregationComplete(complete, list, key, originalExchange, answer, aggregateFailed);
}
log.trace("onAggregation +++ end +++ with correlation key: {}", key);
return list;
}
- protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
+ protected void doAggregationComplete(String complete, List<Exchange> list, String key,
+ Exchange originalExchange, Exchange answer, boolean aggregateFailed) {
if (COMPLETED_BY_CONSUMER.equals(complete)) {
for (String batchKey : batchConsumerCorrelationKeys) {
Exchange batchAnswer;
@@ -554,7 +569,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
if (batchAnswer != null) {
batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- onCompletion(batchKey, originalExchange, batchAnswer, false);
+ onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed);
list.add(batchAnswer);
}
}
@@ -564,7 +579,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
} else if (answer != null) {
// we are complete for this exchange
answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- answer = onCompletion(key, originalExchange, answer, false);
+ answer = onCompletion(key, originalExchange, answer, false, aggregateFailed);
}
if (answer != null) {
@@ -685,7 +700,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
return aggregationStrategy.aggregate(oldExchange, newExchange);
}
- protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
+ protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout, boolean aggregateFailed) {
// store the correlation key as property before we remove so the repository has that information
if (original != null) {
original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
@@ -726,6 +741,15 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
redeliveryState.remove(aggregated.getExchangeId());
// the completion was from timeout and we should just discard it
answer = null;
+ } else if (aggregateFailed && isDiscardOnAggregationFailure()) {
+ // discard due aggregation failed
+ log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
+ // must confirm the discarded exchange
+ aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
+ // and remove redelivery state as well
+ redeliveryState.remove(aggregated.getExchangeId());
+ // the completion was failed during aggregation and we should just discard it
+ answer = null;
} else {
// the aggregated exchange should be published (sent out)
answer = aggregated;
@@ -1006,6 +1030,14 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
this.discardOnCompletionTimeout = discardOnCompletionTimeout;
}
+ public boolean isDiscardOnAggregationFailure() {
+ return discardOnAggregationFailure;
+ }
+
+ public void setDiscardOnAggregationFailure(boolean discardOnAggregationFailure) {
+ this.discardOnAggregationFailure = discardOnAggregationFailure;
+ }
+
public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
this.forceCompletionOnStop = forceCompletionOnStop;
}
@@ -1165,7 +1197,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
// indicate it was completed by timeout
answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_TIMEOUT);
try {
- answer = onCompletion(key, answer, answer, true);
+ answer = onCompletion(key, answer, answer, true, false);
if (answer != null) {
onSubmitCompletion(key, answer);
}
@@ -1213,7 +1245,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
// indicate it was completed by interval
exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_INTERVAL);
try {
- Exchange answer = onCompletion(key, exchange, exchange, false);
+ Exchange answer = onCompletion(key, exchange, exchange, false, false);
if (answer != null) {
onSubmitCompletion(key, answer);
}
@@ -1573,7 +1605,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
log.trace("Force completion triggered for correlation key: {}", key);
// indicate it was completed by a force completion request
exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE);
- Exchange answer = onCompletion(key, exchange, exchange, false);
+ Exchange answer = onCompletion(key, exchange, exchange, false, false);
if (answer != null) {
onSubmitCompletion(key, answer);
}
@@ -1615,7 +1647,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
log.trace("Force completion triggered for correlation key: {}", key);
// indicate it was completed by a force completion request
exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE);
- Exchange answer = onCompletion(key, exchange, exchange, false);
+ Exchange answer = onCompletion(key, exchange, exchange, false, false);
if (answer != null) {
onSubmitCompletion(key, answer);
}
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 62c8b13..0392f9d 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -109,6 +109,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
@XmlAttribute
private Boolean discardOnCompletionTimeout;
@XmlAttribute
+ private Boolean discardOnAggregationFailure;
+ @XmlAttribute
private Boolean forceCompletionOnStop;
@XmlAttribute
private Boolean completeAllOnStop;
@@ -450,7 +452,15 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
this.discardOnCompletionTimeout = discardOnCompletionTimeout;
}
-
+
+ public Boolean getDiscardOnAggregationFailure() {
+ return discardOnAggregationFailure;
+ }
+
+ public void setDiscardOnAggregationFailure(Boolean discardOnAggregationFailure) {
+ this.discardOnAggregationFailure = discardOnAggregationFailure;
+ }
+
public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
}
@@ -555,9 +565,24 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
}
/**
+ * Discards the aggregated message when aggregation failed (an exception was thrown from {@link AggregationStrategy}.
+ * This means the partly aggregated message is dropped and not sent out of the aggregator.
+ * <p/>
+ * This option cannot be used together with completionFromBatchConsumer.
+ *
+ * @return builder
+ */
+ public AggregateDefinition discardOnAggregationFailure() {
+ setDiscardOnAggregationFailure(true);
+ return this;
+ }
+
+ /**
* Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
* and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
* as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
+ * <p/>
+ * This option cannot be used together with discardOnAggregationFailure.
*
* @return builder
*/
diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 4334755..3d00fd0 100644
--- a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -100,6 +100,11 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
+ if (definition.getCompletionFromBatchConsumer() != null && definition.getCompletionFromBatchConsumer()
+ && definition.getDiscardOnAggregationFailure() != null && definition.getDiscardOnAggregationFailure()) {
+ throw new IllegalArgumentException("Cannot use both completionFromBatchConsumer and discardOnAggregationFailure on: " + definition);
+ }
+
// set other options
answer.setParallelProcessing(parallel);
if (definition.getOptimisticLocking() != null) {
@@ -148,6 +153,9 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
if (definition.getDiscardOnCompletionTimeout() != null) {
answer.setDiscardOnCompletionTimeout(definition.getDiscardOnCompletionTimeout());
}
+ if (definition.getDiscardOnAggregationFailure() != null) {
+ answer.setDiscardOnAggregationFailure(definition.getDiscardOnAggregationFailure());
+ }
if (definition.getForceCompletionOnStop() != null) {
answer.setForceCompletionOnStop(definition.getForceCompletionOnStop());
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java
new file mode 100644
index 0000000..35c5284
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class AggregateDiscardOnFailureTest extends ContextTestSupport {
+
+ @Test
+ public void testAggregateDiscardOnAggregationFailureFirst() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+ mock.assertIsSatisfied();
+
+ // send in a new group's with same correlation key but should not fail
+ mock.reset();
+ mock.expectedBodiesReceived("ABC", "DEF");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+ template.sendBodyAndHeader("direct:start", "D", "id", 456);
+ template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testAggregateDiscardOnAggregationFailureMiddle() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+ mock.assertIsSatisfied();
+
+ // send in a new group's with same correlation key but should not fail
+ mock.reset();
+ mock.expectedBodiesReceived("ABC", "DEF");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+ template.sendBodyAndHeader("direct:start", "D", "id", 456);
+ template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testAggregateDiscardOnAggregationFailureLast() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+ mock.assertIsSatisfied();
+
+ // send in a new group's with same correlation key but should not fail
+ mock.reset();
+ mock.expectedBodiesReceived("ABC", "DEF");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+ template.sendBodyAndHeader("direct:start", "D", "id", 456);
+ template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(3).completionTimeout(2000)
+ // and if an exception happens in aggregate then discard the message
+ .discardOnAggregationFailure()
+ .to("mock:aggregated");
+ // END SNIPPET: e1
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if ("Kaboom".equals(newExchange.getMessage().getBody())) {
+ throw new IllegalArgumentException("Forced");
+ }
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ Object body = oldExchange.getMessage().getBody(String.class) + newExchange.getMessage().getBody(String.class);
+ oldExchange.getMessage().setBody(body);
+ return oldExchange;
+ }
+ }
+}