You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/05 12:18:58 UTC

[camel] branch master updated (55ca107 -> c87b86f)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 55ca107  CAMEL-13792 - Rename components to default names, Camel-rxjava2 to camel-rxjava - Regen
     new 05e545d  Remove not used code
     new ab8a08e  CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP
     new c87b86f  CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../processor/aggregate/AggregateProcessor.java    |  62 ++++++---
 .../src/main/docs/eips/aggregate-eip.adoc          |  11 +-
 .../src/main/docs/eips/serviceCall-eip.adoc        |   2 +-
 .../apache/camel/model/AggregateDefinition.java    |  48 ++++++-
 .../org/apache/camel/reifier/AggregateReifier.java |   8 ++
 .../aggregator/AggregateDiscardOnFailureTest.java  | 140 +++++++++++++++++++++
 6 files changed, 248 insertions(+), 23 deletions(-)
 create mode 100644 core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java


[camel] 02/03: CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ab8a08e8717c860cc2bd6f6b8d4d716d3ee782dc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Aug 5 14:10:08 2019 +0200

    CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP
---
 .../processor/aggregate/AggregateProcessor.java    |  58 +++++++--
 .../apache/camel/model/AggregateDefinition.java    |  27 +++-
 .../org/apache/camel/reifier/AggregateReifier.java |   8 ++
 .../aggregator/AggregateDiscardOnFailureTest.java  | 140 +++++++++++++++++++++
 4 files changed, 219 insertions(+), 14 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index efb0b2a..c7dbb09 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -229,6 +229,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
     private boolean completionOnNewCorrelationGroup;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
     private boolean discardOnCompletionTimeout;
+    private boolean discardOnAggregationFailure;
     private boolean forceCompletionOnStop;
     private boolean completeAllOnStop;
     private long completionTimeoutCheckerInterval = 1000;
@@ -477,7 +478,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         if (preCompletion && complete != null) {
             // need to pre complete the current group before we aggregate
-            doAggregationComplete(complete, list, key, originalExchange, oldExchange);
+            doAggregationComplete(complete, list, key, originalExchange, oldExchange, false);
             // as we complete the current group eager, we should indicate the new group is not complete
             complete = null;
             // and clear old/original exchange as we start on a new group
@@ -490,11 +491,24 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         }
 
         // aggregate the exchanges
+        boolean aggregateFailed = false;
         try {
             answer = onAggregation(oldExchange, newExchange);
         } catch (Throwable e) {
-            // must catch any exception from aggregation
-            throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
+            aggregateFailed = true;
+            if (isDiscardOnAggregationFailure()) {
+                // discard due failure in aggregation strategy
+                log.debug("Aggregation for correlation key {} discarding aggregated exchange: {} due to failure in AggregationStrategy caused by: {}", key, oldExchange, e.getMessage());
+                complete = COMPLETED_BY_STRATEGY;
+                answer = oldExchange;
+                if (answer == null) {
+                    // first message in group failed during aggregation and we should just discard this
+                    return null;
+                }
+            } else {
+                // must catch any exception from aggregation
+                throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
+            }
         }
         if (answer == null) {
             throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
@@ -529,19 +543,20 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             }
         }
 
-        if (complete == null) {
+        if (!aggregateFailed && complete == null) {
             // only need to update aggregation repository if we are not complete
             doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
         } else {
             // if we are complete then add the answer to the list
-            doAggregationComplete(complete, list, key, originalExchange, answer);
+            doAggregationComplete(complete, list, key, originalExchange, answer, aggregateFailed);
         }
 
         log.trace("onAggregation +++  end  +++ with correlation key: {}", key);
         return list;
     }
 
-    protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
+    protected void doAggregationComplete(String complete, List<Exchange> list, String key,
+                                         Exchange originalExchange, Exchange answer, boolean aggregateFailed) {
         if (COMPLETED_BY_CONSUMER.equals(complete)) {
             for (String batchKey : batchConsumerCorrelationKeys) {
                 Exchange batchAnswer;
@@ -554,7 +569,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
                 if (batchAnswer != null) {
                     batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                    onCompletion(batchKey, originalExchange, batchAnswer, false);
+                    onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed);
                     list.add(batchAnswer);
                 }
             }
@@ -564,7 +579,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         } else if (answer != null) {
             // we are complete for this exchange
             answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-            answer = onCompletion(key, originalExchange, answer, false);
+            answer = onCompletion(key, originalExchange, answer, false, aggregateFailed);
         }
 
         if (answer != null) {
@@ -685,7 +700,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
-    protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
+    protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout, boolean aggregateFailed) {
         // store the correlation key as property before we remove so the repository has that information
         if (original != null) {
             original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
@@ -726,6 +741,15 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             redeliveryState.remove(aggregated.getExchangeId());
             // the completion was from timeout and we should just discard it
             answer = null;
+        } else if (aggregateFailed && isDiscardOnAggregationFailure()) {
+            // discard due aggregation failed
+            log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
+            // must confirm the discarded exchange
+            aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
+            // and remove redelivery state as well
+            redeliveryState.remove(aggregated.getExchangeId());
+            // the completion was failed during aggregation and we should just discard it
+            answer = null;
         } else {
             // the aggregated exchange should be published (sent out)
             answer = aggregated;
@@ -1006,6 +1030,14 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         this.discardOnCompletionTimeout = discardOnCompletionTimeout;
     }
 
+    public boolean isDiscardOnAggregationFailure() {
+        return discardOnAggregationFailure;
+    }
+
+    public void setDiscardOnAggregationFailure(boolean discardOnAggregationFailure) {
+        this.discardOnAggregationFailure = discardOnAggregationFailure;
+    }
+
     public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
         this.forceCompletionOnStop = forceCompletionOnStop;
     }
@@ -1165,7 +1197,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                 // indicate it was completed by timeout
                 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_TIMEOUT);
                 try {
-                    answer = onCompletion(key, answer, answer, true);
+                    answer = onCompletion(key, answer, answer, true, false);
                     if (answer != null) {
                         onSubmitCompletion(key, answer);
                     }
@@ -1213,7 +1245,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                             // indicate it was completed by interval
                             exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_INTERVAL);
                             try {
-                                Exchange answer = onCompletion(key, exchange, exchange, false);
+                                Exchange answer = onCompletion(key, exchange, exchange, false, false);
                                 if (answer != null) {
                                     onSubmitCompletion(key, answer);
                                 }
@@ -1573,7 +1605,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                 log.trace("Force completion triggered for correlation key: {}", key);
                 // indicate it was completed by a force completion request
                 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE);
-                Exchange answer = onCompletion(key, exchange, exchange, false);
+                Exchange answer = onCompletion(key, exchange, exchange, false, false);
                 if (answer != null) {
                     onSubmitCompletion(key, answer);
                 }
@@ -1615,7 +1647,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
                         log.trace("Force completion triggered for correlation key: {}", key);
                         // indicate it was completed by a force completion request
                         exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE);
-                        Exchange answer = onCompletion(key, exchange, exchange, false);
+                        Exchange answer = onCompletion(key, exchange, exchange, false, false);
                         if (answer != null) {
                             onSubmitCompletion(key, answer);
                         }
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 62c8b13..0392f9d 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -109,6 +109,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     @XmlAttribute
     private Boolean discardOnCompletionTimeout;
     @XmlAttribute
+    private Boolean discardOnAggregationFailure;
+    @XmlAttribute
     private Boolean forceCompletionOnStop;
     @XmlAttribute
     private Boolean completeAllOnStop;
@@ -450,7 +452,15 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
         this.discardOnCompletionTimeout = discardOnCompletionTimeout;
     }
-    
+
+    public Boolean getDiscardOnAggregationFailure() {
+        return discardOnAggregationFailure;
+    }
+
+    public void setDiscardOnAggregationFailure(Boolean discardOnAggregationFailure) {
+        this.discardOnAggregationFailure = discardOnAggregationFailure;
+    }
+
     public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
         this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
     }
@@ -555,9 +565,24 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Discards the aggregated message when aggregation failed (an exception was thrown from {@link AggregationStrategy}.
+     * This means the partly aggregated message is dropped and not sent out of the aggregator.
+     * <p/>
+     * This option cannot be used together with completionFromBatchConsumer.
+     *
+     * @return builder
+     */
+    public AggregateDefinition discardOnAggregationFailure() {
+        setDiscardOnAggregationFailure(true);
+        return this;
+    }
+
+    /**
      * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
      * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
      * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
+     * <p/>
+     * This option cannot be used together with discardOnAggregationFailure.
      *
      * @return builder
      */
diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 4334755..3d00fd0 100644
--- a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -100,6 +100,11 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
         answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
 
+        if (definition.getCompletionFromBatchConsumer() != null && definition.getCompletionFromBatchConsumer()
+                && definition.getDiscardOnAggregationFailure() != null && definition.getDiscardOnAggregationFailure()) {
+            throw new IllegalArgumentException("Cannot use both completionFromBatchConsumer and discardOnAggregationFailure on: " + definition);
+        }
+
         // set other options
         answer.setParallelProcessing(parallel);
         if (definition.getOptimisticLocking() != null) {
@@ -148,6 +153,9 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         if (definition.getDiscardOnCompletionTimeout() != null) {
             answer.setDiscardOnCompletionTimeout(definition.getDiscardOnCompletionTimeout());
         }
+        if (definition.getDiscardOnAggregationFailure() != null) {
+            answer.setDiscardOnAggregationFailure(definition.getDiscardOnAggregationFailure());
+        }
         if (definition.getForceCompletionOnStop() != null) {
             answer.setForceCompletionOnStop(definition.getForceCompletionOnStop());
         }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java
new file mode 100644
index 0000000..35c5284
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class AggregateDiscardOnFailureTest extends ContextTestSupport {
+
+    @Test
+    public void testAggregateDiscardOnAggregationFailureFirst() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+        mock.assertIsSatisfied();
+
+        // send in a new group's with same correlation key but should not fail
+        mock.reset();
+        mock.expectedBodiesReceived("ABC", "DEF");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        template.sendBodyAndHeader("direct:start", "D", "id", 456);
+        template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testAggregateDiscardOnAggregationFailureMiddle() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+        mock.assertIsSatisfied();
+
+        // send in a new group's with same correlation key but should not fail
+        mock.reset();
+        mock.expectedBodiesReceived("ABC", "DEF");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        template.sendBodyAndHeader("direct:start", "D", "id", 456);
+        template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testAggregateDiscardOnAggregationFailureLast() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123);
+
+        mock.assertIsSatisfied();
+
+        // send in a new group's with same correlation key but should not fail
+        mock.reset();
+        mock.expectedBodiesReceived("ABC", "DEF");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        template.sendBodyAndHeader("direct:start", "D", "id", 456);
+        template.sendBodyAndHeader("direct:start", "E", "id", 456);
+
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 456);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(3).completionTimeout(2000)
+                        // and if an exception happens in aggregate then discard the message
+                        .discardOnAggregationFailure()
+                        .to("mock:aggregated");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    private class MyAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if ("Kaboom".equals(newExchange.getMessage().getBody())) {
+                throw new IllegalArgumentException("Forced");
+            }
+
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            Object body = oldExchange.getMessage().getBody(String.class) + newExchange.getMessage().getBody(String.class);
+            oldExchange.getMessage().setBody(body);
+            return oldExchange;
+        }
+    }
+}


[camel] 03/03: CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c87b86f4c2ed7fb82441fff5e3124f8c37fe246e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Aug 5 14:16:40 2019 +0200

    CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP
---
 .../src/main/docs/eips/aggregate-eip.adoc           | 11 +++++++----
 .../src/main/docs/eips/serviceCall-eip.adoc         |  2 +-
 .../org/apache/camel/model/AggregateDefinition.java | 21 +++++++++++++++++++++
 3 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/core/camel-core/src/main/docs/eips/aggregate-eip.adoc b/core/camel-core/src/main/docs/eips/aggregate-eip.adoc
index ab15c75..365ed17 100644
--- a/core/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -17,15 +17,15 @@ single correlation key into a single message exchange.
 == Aggregator options
 
 // eip options: START
-The Aggregate EIP supports 24 options which are listed below:
+The Aggregate EIP supports 27 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
 | *correlationExpression* | *Required* The expression used to calculate the correlation key to use for aggregation. The Exchange which has the same correlation key is aggregated together. If the correlation key could not be evaluated an Exception is thrown. You can disable this by using the ignoreBadCorrelationKeys option. |  | NamespaceAware Expression
 | *completionPredicate* | A Predicate to indicate when an aggregated exchange is complete. If this is not specified and the AggregationStrategy object implements Predicate, the aggregationStrategy object will be used as the completionPredicate. |  | NamespaceAware Expression
-| *completionTimeout* | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used. By default the timeout checker run [...]
-| *completionSize* | Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. |  | Integer
+| *completionTimeoutExpression* | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used. By default the timeout c [...]
+| *completionSizeExpression* | Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. |  | NamespaceAware Expression
 | *optimisticLockRetryPolicy* | Allows to configure retry settings when using optimistic locking. |  | OptimisticLockRetry PolicyDefinition
 | *parallelProcessing* | When aggregated are completed they are being send out of the aggregator. This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. | false | Boolean
 | *optimisticLocking* | Turns on using optimistic locking, which requires the aggregationRepository being used, is supporting this by implementing org.apache.camel.spi.OptimisticLockingAggregationRepository. | false | Boolean
@@ -35,14 +35,17 @@ The Aggregate EIP supports 24 options which are listed below:
 | *strategyRef* | A reference to lookup the AggregationStrategy in the Registry. Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. At first call the oldExchange parameter is null. On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. |  | String
 | *strategyMethodName* | This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |  | String
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used for the very first aggregation. If this option is true then null values is used as the oldExchange (at the very first aggregation), when using POJOs as the AggregationStrategy. | false | Boolean
+| *completionSize* | Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. |  | Integer
 | *completionInterval* | A repeating period in millis by which the aggregator will complete all current aggregated exchanges. Camel has a background task which is triggered every period. You cannot use this option together with completionTimeout, only one of them can be used. |  | Long
+| *completionTimeout* | Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used. By default the timeout checker run [...]
 | *completionTimeoutChecker Interval* | Interval in millis that is used by the background task that checks for timeouts (org.apache.camel.TimeoutMap). By default the timeout checker runs every second. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals. | 1000 | Long
-| *completionFromBatchConsumer* | Enables the batch completion mode where we aggregate from a org.apache.camel.BatchConsumer and aggregate the total number of exchanges the org.apache.camel.BatchConsumer has reported as total by checking the exchange property org.apache.camel.Exchange#BATCH_COMPLETE when its complete. | false | Boolean
+| *completionFromBatchConsumer* | Enables the batch completion mode where we aggregate from a org.apache.camel.BatchConsumer and aggregate the total number of exchanges the org.apache.camel.BatchConsumer has reported as total by checking the exchange property org.apache.camel.Exchange#BATCH_COMPLETE when its complete. This option cannot be used together with discardOnAggregationFailure. | false | Boolean
 | *completionOnNewCorrelation Group* | Enables completion on all previous groups when a new incoming correlation group. This can for example be used to complete groups with same correlation keys when they are in consecutive order. Notice when this is enabled then only 1 correlation group can be in progress as when a new correlation group starts, then the previous groups is forced completed. | false | Boolean
 | *eagerCheckCompletion* | Use eager completion checking which means that the completionPredicate will use the incoming Exchange. As opposed to without eager completion checking the completionPredicate will use the aggregated Exchange. | false | Boolean
 | *ignoreInvalidCorrelation Keys* | If a correlation key cannot be successfully evaluated it will be ignored by logging a DEBUG and then just ignore the incoming Exchange. | false | Boolean
 | *closeCorrelationKeyOn Completion* | Closes a correlation key when its complete. Any late received exchanges which has a correlation key that has been closed, it will be defined and a ClosedCorrelationKeyException is thrown. |  | Integer
 | *discardOnCompletionTimeout* | Discards the aggregated message on completion timeout. This means on timeout the aggregated message is dropped and not sent out of the aggregator. | false | Boolean
+| *discardOnAggregationFailure* | Discards the aggregated message when aggregation failed (an exception was thrown from AggregationStrategy. This means the partly aggregated message is dropped and not sent out of the aggregator. This option cannot be used together with completionFromBatchConsumer. | false | Boolean
 | *forceCompletionOnStop* | Indicates to complete all current aggregated exchanges when the context is stopped | false | Boolean
 | *completeAllOnStop* | Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. This also means that we will wait for all pending exchanges which are stored in the aggregation repository to complete so the repository is empty before we can stop. You may want to enable this when using the memory based aggregation repository that is memory based only, and do not store data on disk. When this option is enabled, then the aggregator is [...]
 | *aggregateControllerRef* | To use a org.apache.camel.processor.aggregate.AggregateController to allow external sources to control this aggregator. |  | String
diff --git a/core/camel-core/src/main/docs/eips/serviceCall-eip.adoc b/core/camel-core/src/main/docs/eips/serviceCall-eip.adoc
index 6db6a1a..221d775 100644
--- a/core/camel-core/src/main/docs/eips/serviceCall-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/serviceCall-eip.adoc
@@ -100,7 +100,7 @@ The Service Call EIP supports 14 options which are listed below:
 | Name | Description | Default | Type
 | *name* | *Required* Sets the name of the service to use |  | String
 | *uri* | The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. |  | String
-| *component* | The component to use. | http4 | String
+| *component* | The component to use. | http | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint |  | ExchangePattern
 | *configurationRef* | Refers to a ServiceCall configuration to use |  | String
 | *serviceDiscoveryRef* | Sets a reference to a custom ServiceDiscovery to use. |  | String
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 0392f9d..2fe4b5c 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -341,6 +341,20 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         return completionTimeoutExpression;
     }
 
+    /**
+     * Time in millis that an aggregated exchange should be inactive before its complete (timeout).
+     * This option can be set as either a fixed value or using an Expression which allows you to evaluate
+     * a timeout dynamically - will use Long as result.
+     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
+     * You cannot use this option together with completionInterval, only one of the two can be used.
+     * <p/>
+     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
+     * to configure how frequently to run the checker.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
+     *
+     * @param completionTimeoutExpression  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
+     */
     public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
         this.completionTimeoutExpression = completionTimeoutExpression;
     }
@@ -349,6 +363,13 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         return completionSizeExpression;
     }
 
+    /**
+     * Number of messages aggregated before the aggregation is complete. This option can be set as either
+     * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result.
+     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
+     *
+     * @param completionSizeExpression  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
+     */
     public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
         this.completionSizeExpression = completionSizeExpression;
     }


[camel] 01/03: Remove not used code

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 05e545d7e0cd3058a796b9586394f66769caf5ad
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Aug 5 11:50:24 2019 +0200

    Remove not used code
---
 .../java/org/apache/camel/processor/aggregate/AggregateProcessor.java | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index f9756bd..efb0b2a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -685,10 +685,6 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
-    protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
-        return aggregationStrategy.preComplete(oldExchange, newExchange);
-    }
-
     protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
         // store the correlation key as property before we remove so the repository has that information
         if (original != null) {