You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/01 15:00:39 UTC

[1/2] camel git commit: CAMEL-10272: Provide an option to stop further processing when an exception is thrown from an aggregation strategy while parallelProcessing is used.

Repository: camel
Updated Branches:
  refs/heads/master 7ee49fe05 -> 3a227f2c4


CAMEL-10272: Provide an option to stop further processing when an exception is thrown from an aggregation strategy while parallelProcessing is used.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/43036a4a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/43036a4a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/43036a4a

Branch: refs/heads/master
Commit: 43036a4a574873feaa421abbb5393f865714d790
Parents: 7ee49fe
Author: aldettinger <al...@gmail.com>
Authored: Tue Dec 27 10:55:18 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jan 1 15:21:10 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/RecipientList.java    |  1 +
 .../apache/camel/component/bean/MethodInfo.java |  1 +
 .../apache/camel/model/MulticastDefinition.java | 27 +++++++-
 .../camel/model/RecipientListDefinition.java    | 26 ++++++++
 .../org/apache/camel/model/SplitDefinition.java | 27 +++++++-
 .../camel/processor/MulticastProcessor.java     | 27 ++++++--
 .../apache/camel/processor/RecipientList.java   | 12 +++-
 .../camel/processor/RecipientListProcessor.java | 11 +++-
 .../org/apache/camel/processor/Splitter.java    | 18 ++++--
 .../aggregate/AggregationStrategy.java          |  2 +-
 ...ggregationStrategyThrowingExceptionTest.java | 68 ++++++++++++++++++++
 ...ggregationStrategyThrowingExceptionTest.java | 66 +++++++++++++++++++
 ...ggregationStrategyThrowingExceptionTest.java | 66 +++++++++++++++++++
 .../camel/management/ManagedMulticastTest.java  |  2 +-
 .../management/ManagedRecipientListTest.java    |  2 +-
 .../camel/management/ManagedSplitterTest.java   |  2 +-
 16 files changed, 338 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/RecipientList.java b/camel-core/src/main/java/org/apache/camel/RecipientList.java
index 7cd9cda..bd5e996 100644
--- a/camel-core/src/main/java/org/apache/camel/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/RecipientList.java
@@ -48,6 +48,7 @@ public @interface RecipientList {
     boolean parallelProcessing() default false;
     boolean parallelAggregate() default false;
     boolean stopOnException() default false;
+    boolean stopOnAggregateException() default false;
     boolean streaming() default false;
     boolean ignoreInvalidEndpoints() default false;
     String strategyRef() default "";

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
index 5ee4b46..3e5a314 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
@@ -163,6 +163,7 @@ public class MethodInfo {
                 && matchContext(recipientListAnnotation.context())) {
             recipientList = new RecipientList(camelContext, recipientListAnnotation.delimiter());
             recipientList.setStopOnException(recipientListAnnotation.stopOnException());
+            recipientList.setStopOnAggregateException(recipientListAnnotation.stopOnAggregateException());
             recipientList.setIgnoreInvalidEndpoints(recipientListAnnotation.ignoreInvalidEndpoints());
             recipientList.setParallelProcessing(recipientListAnnotation.parallelProcessing());
             recipientList.setParallelAggregate(recipientListAnnotation.parallelAggregate());

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index bc8e76c..7bff217 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -73,6 +73,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     private Boolean shareUnitOfWork;
     @XmlAttribute
     private Boolean parallelAggregate;
+    @XmlAttribute
+    private Boolean stopOnAggregateException;
 
     public MulticastDefinition() {
     }
@@ -183,6 +185,20 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
         setParallelAggregate(true);
         return this;
     }
+    
+    /**
+     * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used.
+     * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used.
+     * Enabling this option allows to work around this behavior.
+     *
+     * The default value is <code>false</code> for the sake of backward compatibility.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition stopOnAggregateException() {
+        setStopOnAggregateException(true);
+        return this;
+    }
 
     /**
      * If enabled then Camel will process replies out-of-order, eg in the order they come back.
@@ -294,6 +310,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
         boolean isStreaming = getStreaming() != null && getStreaming();
         boolean isStopOnException = getStopOnException() != null && getStopOnException();
         boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
+        boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException();
 
         boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
         ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing);
@@ -307,7 +324,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
         }
 
         MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing,
-                                      threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
+                                      threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException);
         return answer;
     }
 
@@ -474,4 +491,12 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
         this.parallelAggregate = parallelAggregate;
     }
 
+    public Boolean getStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 0d02a48..b7b3b85 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -83,6 +83,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     private Integer cacheSize;
     @XmlAttribute
     private Boolean parallelAggregate;
+    @XmlAttribute
+    private Boolean stopOnAggregateException;
 
     public RecipientListDefinition() {
     }
@@ -115,6 +117,7 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
         boolean isStopOnException = getStopOnException() != null && getStopOnException();
         boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints();
+        boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException();
 
         RecipientList answer;
         if (delimiter != null) {
@@ -129,6 +132,7 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         answer.setShareUnitOfWork(isShareUnitOfWork);
         answer.setStopOnException(isStopOnException);
         answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints);
+        answer.setStopOnAggregateException(isStopOnAggregateException);
         if (getCacheSize() != null) {
             answer.setCacheSize(getCacheSize());
         }
@@ -322,6 +326,20 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     }
 
     /**
+     * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used.
+     * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used.
+     * Enabling this option allows to work around this behavior.
+     *
+     * The default value is <code>false</code> for the sake of backward compatibility.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> stopOnAggregateException() {
+        setStopOnAggregateException(true);
+        return this;
+    }
+
+    /**
      * If enabled then Camel will process replies out-of-order, eg in the order they come back.
      * If disabled, Camel will process replies in the same order as defined by the recipient list.
      *
@@ -599,4 +617,12 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     public void setParallelAggregate(Boolean parallelAggregate) {
         this.parallelAggregate = parallelAggregate;
     }
+
+    public Boolean getStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index f98780f..e7305e8 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -72,6 +72,8 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
     private Boolean shareUnitOfWork;
     @XmlAttribute
     private Boolean parallelAggregate;
+    @XmlAttribute
+    private Boolean stopOnAggregateException;
 
     public SplitDefinition() {
     }
@@ -103,6 +105,7 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
         boolean isStreaming = getStreaming() != null && getStreaming();
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
         boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
+        boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException();
         boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
         ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing);
 
@@ -118,7 +121,7 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
 
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
                             isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(),
-                            timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
+                            timeout, onPrepare, isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException);
         return answer;
     }
 
@@ -231,6 +234,20 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
         setParallelAggregate(true);
         return this;
     }
+    
+    /**
+     * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used.
+     * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used.
+     * Enabling this option allows to work around this behavior.
+     *
+     * The default value is <code>false</code> for the sake of backward compatibility.
+     *
+     * @return the builder
+     */
+    public SplitDefinition stopOnAggregateException() {
+        setStopOnAggregateException(true);
+        return this;
+    }
 
     /**
      * When in streaming mode, then the splitter splits the original message on-demand, and each splitted
@@ -390,6 +407,14 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
     public void setParallelAggregate(Boolean parallelAggregate) {
         this.parallelAggregate = parallelAggregate;
     }
+    
+    public Boolean getStopOnAggregateException() {
+        return this.stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
 
     public Boolean getStopOnException() {
         return stopOnException;

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index e0cd13d..b0def97 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -152,6 +152,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
     private final boolean parallelProcessing;
     private final boolean streaming;
     private final boolean parallelAggregate;
+    private final boolean stopOnAggregateException;
     private final boolean stopOnException;
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
@@ -176,10 +177,17 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false);
     }
 
+    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+                              ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
+                              boolean shareUnitOfWork, boolean parallelAggregate) {
+        this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
+             shareUnitOfWork, false, false);
+    }
+    
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming,
                               boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork,
-                              boolean parallelAggregate) {
+                              boolean parallelAggregate, boolean stopOnAggregateException) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.processors = processors;
@@ -194,6 +202,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         this.onPrepare = onPrepare;
         this.shareUnitOfWork = shareUnitOfWork;
         this.parallelAggregate = parallelAggregate;
+        this.stopOnAggregateException = stopOnAggregateException;
     }
 
     @Override
@@ -530,10 +539,14 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                     doAggregate(getAggregationStrategy(subExchange), result, subExchange);
                 }
             } catch (Throwable e) {
-                // wrap in exception to explain where it failed
-                CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e);
-                subExchange.setException(cex);
-                LOG.debug(cex.getMessage(), cex);
+                if (isStopOnAggregateException()) {
+                    throw e;
+                } else {
+                    // wrap in exception to explain where it failed
+                    CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e);
+                    subExchange.setException(cex);
+                    LOG.debug(cex.getMessage(), cex);
+                }
             } finally {
                 aggregated.incrementAndGet();
             }
@@ -1294,6 +1307,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         return parallelAggregate;
     }
 
+    public boolean isStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
     public boolean isShareUnitOfWork() {
         return shareUnitOfWork;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index ded8ca9..7534e87 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -61,6 +61,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
     private final String delimiter;
     private boolean parallelProcessing;
     private boolean parallelAggregate;
+    private boolean stopOnAggregateException;
     private boolean stopOnException;
     private boolean ignoreInvalidEndpoints;
     private boolean streaming;
@@ -145,7 +146,8 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
 
         RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
-                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) {
+                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
+                isStopOnAggregateException()) {
             @Override
             protected synchronized ExecutorService createAggregateExecutorService(String name) {
                 // use a shared executor service to avoid creating new thread pools
@@ -250,6 +252,14 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
         this.parallelAggregate = parallelAggregate;
     }
 
+    public boolean isStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
+
     public boolean isStopOnException() {
         return stopOnException;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index db6af86..33ef611 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -163,8 +163,15 @@ public class RecipientListProcessor extends MulticastProcessor {
     public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) {
-        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
-                streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate);
+        this(camelContext, producerCache, iter, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
+             shareUnitOfWork, parallelAggregate, false);
+    }
+
+    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
+                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException,
+                                  long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
+        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
+              shareUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.producerCache = producerCache;
         this.iter = iter;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index ba3be2e..8a06f79 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -69,12 +69,18 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
                 streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false);
     }
 
-    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
-                    boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
-                    boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork,
-                    boolean parallelAggregate) {
-        super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService,
-                shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate);
+    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+                    ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
+                    boolean useSubUnitOfWork, boolean parallelAggregate) {
+        this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout,
+             onPrepare, useSubUnitOfWork, false, false);
+    }
+
+    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+                    ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
+                    boolean useSubUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
+        super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException,
+              timeout, onPrepare, useSubUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.expression = expression;
         notNull(expression, "expression");
         notNull(destination, "destination");

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
index 802e1b8..c593fa4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
@@ -29,7 +29,7 @@ import org.apache.camel.Exchange;
  * could be to sum up a total amount etc.
  * <p/>
  * Note that <tt>oldExchange</tt> may be <tt>null</tt> more than once when this strategy is throwing a {@link java.lang.RuntimeException}
- * and <tt>parallelProcessing</tt> is used.
+ * and <tt>parallelProcessing</tt> is used. You can work around this behavior using the <tt>stopOnAggregateException</tt> option.
  * <p/>
  * It is possible that <tt>newExchange</tt> is <tt>null</tt> which could happen if there was no data possible
  * to acquire. Such as when using a {@link org.apache.camel.processor.PollEnricher} to poll from a JMS queue which

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
new file mode 100644
index 0000000..4b35ea8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Tests the issue stated in
+ * <a href="https://issues.apache.org/jira/browse/CAMEL-10272">CAMEL-10272</a>.
+ */
+public class MulticastParallelWithAggregationStrategyThrowingExceptionTest extends ContextTestSupport {
+
+    public void testAggregationTimeExceptionWithParallelProcessing() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on
+                // exceptions
+                // from the aggregation strategy also.
+                from("direct:start").
+                multicast(new MyAggregateBean()).parallelProcessing().stopOnAggregateException().shareUnitOfWork()
+                    .to("mock:a")
+                    .to("mock:b")
+               .end()
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            throw new RuntimeException("Simulating a runtime exception thrown from the aggregation strategy");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
new file mode 100644
index 0000000..4509c23
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Tests the issue stated in
+ * <a href="https://issues.apache.org/jira/browse/CAMEL-10272">CAMEL-10272</a>.
+ */
+public class RecipientListParallelWithAggregationStrategyThrowingExceptionTest extends ContextTestSupport {
+
+    public void testAggregationTimeExceptionWithParallelProcessing() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "recipients", "mock:a,mock:b");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on
+                // exceptions
+                // from the aggregation strategy also.
+                from("direct:start").
+                recipientList(header("recipients")).aggregationStrategy(new MyAggregateBean()).
+                parallelProcessing().stopOnAggregateException().shareUnitOfWork()
+                .end()
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            throw new RuntimeException("Simulating a runtime exception thrown from the aggregation strategy");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
new file mode 100644
index 0000000..66743f4
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Tests the issue stated in
+ * <a href="https://issues.apache.org/jira/browse/CAMEL-10272">CAMEL-10272</a>.
+ */
+public class SplitterParallelWithAggregationStrategyThrowingExceptionTest extends ContextTestSupport {
+
+    public void testAggregationTimeExceptionWithParallelProcessing() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(2);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello@World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on
+                // exceptions
+                // from the aggregation strategy also.
+                from("direct:start").
+                split(body().tokenize("@")).aggregationStrategy(new MyAggregateBean()).
+                parallelProcessing().stopOnAggregateException().shareUnitOfWork()
+                    .to("mock:a")
+               .end()
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            throw new RuntimeException("Simulating a runtime exception thrown from the aggregation strategy");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
index 8adac73..486bccf 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
@@ -67,7 +67,7 @@ public class ManagedMulticastTest extends ManagementTestSupport {
 
         data = (TabularData) mbeanServer.invoke(name, "explain", new Object[]{true}, new String[]{"boolean"});
         assertNotNull(data);
-        assertEquals(14, data.size());
+        assertEquals(15, data.size());
 
         String json = (String) mbeanServer.invoke(name, "informationJson", null, null);
         assertNotNull(json);

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
index 0e041e3..a37d6ca 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
@@ -91,7 +91,7 @@ public class ManagedRecipientListTest extends ManagementTestSupport {
 
         data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"});
         assertNotNull(data);
-        assertEquals(17, data.size());
+        assertEquals(18, data.size());
 
         String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
         assertNotNull(json);

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
index c13b342..3276f9e 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
@@ -75,7 +75,7 @@ public class ManagedSplitterTest extends ManagementTestSupport {
 
         data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"});
         assertNotNull(data);
-        assertEquals(15, data.size());
+        assertEquals(16, data.size());
 
         String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
         assertNotNull(json);


[2/2] camel git commit: Component docs

Posted by da...@apache.org.
Component docs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3a227f2c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3a227f2c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3a227f2c

Branch: refs/heads/master
Commit: 3a227f2c41344a2cb499de4293a494bd462a30b7
Parents: 43036a4
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jan 1 15:59:53 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jan 1 15:59:53 2017 +0100

----------------------------------------------------------------------
 camel-core/src/main/docs/class-component.adoc | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3a227f2c/camel-core/src/main/docs/class-component.adoc
----------------------------------------------------------------------
diff --git a/camel-core/src/main/docs/class-component.adoc b/camel-core/src/main/docs/class-component.adoc
index a93e384..ae44978 100644
--- a/camel-core/src/main/docs/class-component.adoc
+++ b/camel-core/src/main/docs/class-component.adoc
@@ -33,17 +33,14 @@ The Class component has no options.
 
 
 // endpoint options: START
-The Class component supports 9 endpoint options which are listed below:
+The Class component supports 6 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
-| beanName | common |  | String | *Required* Sets the name of the bean to invoke
-| method | common |  | String | Sets the name of the method to invoke on the bean
-| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
-| exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
-| exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the default exchange pattern when creating an exchange.
+| beanName | producer |  | String | *Required* Sets the name of the bean to invoke
+| method | producer |  | String | Sets the name of the method to invoke on the bean
 | cache | advanced | false | boolean | If enabled Camel will cache the result of the first Registry look-up. Cache can be enabled if the bean in the Registry is defined as a singleton scope.
 | multiParameterArray | advanced | false | boolean | How to treat the parameters which are passed from the message body; if it is true the message body should be an array of parameters. Note: This option is used internally by Camel and is not intended for end users to use.
 | parameters | advanced |  | Map | Used for configuring additional properties on the bean