You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/05 12:19:00 UTC

[camel] 02/03: CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ab8a08e8717c860cc2bd6f6b8d4d716d3ee782dc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Aug 5 14:10:08 2019 +0200

    CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP
---
 .../processor/aggregate/AggregateProcessor.java    |  58 +++++++--
 .../apache/camel/model/AggregateDefinition.java    |  27 +++-
 .../org/apache/camel/reifier/AggregateReifier.java |   8 ++
 .../aggregator/AggregateDiscardOnFailureTest.java  | 140 +++++++++++++++++++++
 4 files changed, 219 insertions(+), 14 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index efb0b2a..c7dbb09 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -229,6 +229,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
     private boolean completionOnNewCorrelationGroup;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
     private boolean discardOnCompletionTimeout;
+    private boolean discardOnAggregationFailure;
     private boolean forceCompletionOnStop;
     private boolean completeAllOnStop;
     private long completionTimeoutCheckerInterval = 1000;
@@ -477,7 +478,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         if (preCompletion && complete != null) {
             // need to pre complete the current group before we aggregate
-            doAggregationComplete(complete, list, key, originalExchange, oldExchange);
+            doAggregationComplete(complete, list, key, originalExchange, oldExchange, false);
             // as we complete the current group eager, we should indicate the new group is not complete
             complete = null;
             // and clear old/original exchange as we start on a new group
@@ -490,11 +491,24 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         }
 
         // aggregate the exchanges
+        boolean aggregateFailed = false;
         try {
             answer = onAggregation(oldExchange, newExchange);
         } catch (Throwable e) {
-            // must catch any exception from aggregation
-            throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
+            aggregateFailed = true;
+            if (isDiscardOnAggregationFailure()) {
+                // discard due failure in aggregation strategy
+                log.debug("Aggregation for correlation key {} discarding aggregated exchange: {} due to failure in AggregationStrategy caused by: {}", key, oldExchange, e.getMessage());
+                complete = COMPLETED_BY_STRATEGY;
+                answer = oldExchange;
+                if (answer == null) {
+                    // first message in group failed during aggregation and we should just discard this
+                    return null;
+                }
+            } else {
+                // must catch any exception from aggregation
+                throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
+            }
         }
         if (answer == null) {
             throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
@@ -529,19 +543,20 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             }
         }
 
-        if (complete == null) {
+        if (!aggregateFailed && complete == null) {
             // only need to update aggregation repository if we are not complete
             doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
         } else {
             // if we are complete then add the answer to the list
-            doAggregationComplete(complete, list, key, originalExchange, answer);
+            doAggregationComplete(complete, list, key, originalExchange, answer, aggregateFailed);
         }
 
         log.trace("onAggregation +++  end  +++ with correlation key: {}", key);
         return list;
     }
 
-    protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
+    protected void doAggregationComplete(String complete, List<Exchange> list, String key,
+                                         Exchange originalExchange, Exchange answer, boolean aggregateFailed) {
         if (COMPLETED_BY_CONSUMER.equals(complete)) {
             for (String batchKey : batchConsumerCorrelationKeys) {
                 Exchange batchAnswer;
@@ -554,7 +569,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
                 if (batchAnswer != null) {
                     batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                    onCompletion(batchKey, originalExchange, batchAnswer, false);
+                    onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed);
                     list.add(batchAnswer);
                 }
             }
@@ -564,7 +579,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         } else if (answer != null) {
             // we are complete for this exchange
             answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-            answer = onCompletion(key, originalExchange, answer, false);
+            answer = onCompletion(key, originalExchange, answer, false, aggregateFailed);
         }
 
         if (answer != null) {
@@ -685,7 +700,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
-    protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
+    protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout, boolean aggregateFailed) {
         // store the correlation key as property before we remove so the repository has that information
         if (original != null) {
             original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
@@ -726,6 +741,15 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             redeliveryState.remove(aggregated.getExchangeId());
             // the completion was from timeout and we should just discard it
             answer = null;
+        } else if (aggregateFailed && isDiscardOnAggregationFailure()) {
+            // discard due aggregation failed
+            log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
+            // must confirm the discarded exchange
+            aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
+            // and remove redelivery state as well
+            redeliveryState.remove(aggregated.getExchangeId());
+            // the completion was failed during aggregation and we should just discard it
+            answer = null;
         } else {
             // the aggregated exchange should be published (sent out)
             answer = aggregated;
@@ -1006,6 +1030,14 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         this.discardOnCompletionTimeout = discardOnCompletionTimeout;
     }
 
+    public boolean isDiscardOnAggregationFailure() {
+        return discardOnAggregationFailure;
+    }
+
+    public void setDiscardOnAggregationFailure(boolean discardOnAggregationFailure) {
+        this.discardOnAggregationFailure = discardOnAggregationFailure;
+    }
+
     public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
         this.forceCompletionOnStop = forceCompletionOnStop;
     }
@@ -1165,7 +1197,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                 // indicate it was completed by timeout
                 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_TIMEOUT);
                 try {
-                    answer = onCompletion(key, answer, answer, true);
+                    answer = onCompletion(key, answer, answer, true, false);
                     if (answer != null) {
                         onSubmitCompletion(key, answer);
                     }
@@ -1213,7 +1245,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                             // indicate it was completed by interval
                             exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_INTERVAL);
                             try {
-                                Exchange answer = onCompletion(key, exchange, exchange, false);
+                                Exchange answer = onCompletion(key, exchange, exchange, false, false);
                                 if (answer != null) {
                                     onSubmitCompletion(key, answer);
                                 }
@@ -1573,7 +1605,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                 log.trace("Force completion triggered for correlation key: {}", key);
                 // indicate it was completed by a force completion request
                 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE);
-                Exchange answer = onCompletion(key, exchange, exchange, false);
+                Exchange answer = onCompletion(key, exchange, exchange, false, false);
                 if (answer != null) {
                     onSubmitCompletion(key, answer);
                 }
@@ -1615,7 +1647,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                         log.trace("Force completion triggered for correlation key: {}", key);
                         // indicate it was completed by a force completion request
                         exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE);
-                        Exchange answer = onCompletion(key, exchange, exchange, false);
+                        Exchange answer = onCompletion(key, exchange, exchange, false, false);
                         if (answer != null) {
                             onSubmitCompletion(key, answer);
                         }
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 62c8b13..0392f9d 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -109,6 +109,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     @XmlAttribute
     private Boolean discardOnCompletionTimeout;
     @XmlAttribute
+    private Boolean discardOnAggregationFailure;
+    @XmlAttribute
     private Boolean forceCompletionOnStop;
     @XmlAttribute
     private Boolean completeAllOnStop;
@@ -450,7 +452,15 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
         this.discardOnCompletionTimeout = discardOnCompletionTimeout;
     }
-    
+
+    public Boolean getDiscardOnAggregationFailure() {
+        return discardOnAggregationFailure;
+    }
+
+    public void setDiscardOnAggregationFailure(Boolean discardOnAggregationFailure) {
+        this.discardOnAggregationFailure = discardOnAggregationFailure;
+    }
+
     public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
         this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
     }
@@ -555,9 +565,24 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Discards the aggregated message when aggregation failed (an exception was thrown from {@link AggregationStrategy}.
+     * This means the partly aggregated message is dropped and not sent out of the aggregator.
+     * <p/>
+     * This option cannot be used together with completionFromBatchConsumer.
+     *
+     * @return builder
+     */
+    public AggregateDefinition discardOnAggregationFailure() {
+        setDiscardOnAggregationFailure(true);
+        return this;
+    }
+
+    /**
      * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
      * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
      * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
+     * <p/>
+     * This option cannot be used together with discardOnAggregationFailure.
      *
      * @return builder
      */
diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 4334755..3d00fd0 100644
--- a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -100,6 +100,11 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
         answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
 
+        if (definition.getCompletionFromBatchConsumer() != null && definition.getCompletionFromBatchConsumer()
+                && definition.getDiscardOnAggregationFailure() != null && definition.getDiscardOnAggregationFailure()) {
+            throw new IllegalArgumentException("Cannot use both completionFromBatchConsumer and discardOnAggregationFailure on: " + definition);
+        }
+
         // set other options
         answer.setParallelProcessing(parallel);
         if (definition.getOptimisticLocking() != null) {
@@ -148,6 +153,9 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         if (definition.getDiscardOnCompletionTimeout() != null) {
             answer.setDiscardOnCompletionTimeout(definition.getDiscardOnCompletionTimeout());
         }
+        if (definition.getDiscardOnAggregationFailure() != null) {
+            answer.setDiscardOnAggregationFailure(definition.getDiscardOnAggregationFailure());
+        }
         if (definition.getForceCompletionOnStop() != null) {
             answer.setForceCompletionOnStop(definition.getForceCompletionOnStop());
         }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java
new file mode 100644
index 0000000..35c5284
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class AggregateDiscardOnFailureTest extends ContextTestSupport {
+
+    @Test
+    public void testAggregateDiscardOnAggregationFailureFirst() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+        mock.assertIsSatisfied();
+
+        // send in a new group's with same correlation key but should not fail
+        mock.reset();
+        mock.expectedBodiesReceived("ABC", "DEF");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        template.sendBodyAndHeader("direct:start", "D", "id", 456);
+        template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testAggregateDiscardOnAggregationFailureMiddle() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+        mock.assertIsSatisfied();
+
+        // send in a new group's with same correlation key but should not fail
+        mock.reset();
+        mock.expectedBodiesReceived("ABC", "DEF");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        template.sendBodyAndHeader("direct:start", "D", "id", 456);
+        template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testAggregateDiscardOnAggregationFailureLast() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+        mock.assertIsSatisfied();
+
+        // send in a new group's with same correlation key but should not fail
+        mock.reset();
+        mock.expectedBodiesReceived("ABC", "DEF");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        template.sendBodyAndHeader("direct:start", "D", "id", 456);
+        template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(3).completionTimeout(2000)
+                        // and if an exception happens in aggregate then discard the message
+                        .discardOnAggregationFailure()
+                        .to("mock:aggregated");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    private class MyAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if ("Kaboom".equals(newExchange.getMessage().getBody())) {
+                throw new IllegalArgumentException("Forced");
+            }
+
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            Object body = oldExchange.getMessage().getBody(String.class) + newExchange.getMessage().getBody(String.class);
+            oldExchange.getMessage().setBody(body);
+            return oldExchange;
+        }
+    }
+}