You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 10:15:41 UTC

[camel] branch master updated: CAMEL-10845: Java DSL add Java8 Supplier support

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new e38f2a0  CAMEL-10845: Java DSL add Java8 Supplier support
e38f2a0 is described below

commit e38f2a0751445478a2a71620407a723ffab9ab04
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 12:06:50 2019 +0200

    CAMEL-10845: Java DSL add Java8 Supplier support
---
 .../apache/camel/model/AggregateDefinition.java    | 49 ++++++++++++++++++++++
 .../org/apache/camel/model/CatchDefinition.java    |  4 ++
 .../apache/camel/model/ClaimCheckDefinition.java   | 10 +++++
 .../org/apache/camel/model/EnrichDefinition.java   | 10 +++++
 .../model/ExecutorServiceAwareDefinition.java      | 11 +++++
 .../camel/model/IdempotentConsumerDefinition.java  | 12 ++++++
 .../java/org/apache/camel/model/LogDefinition.java |  1 +
 .../apache/camel/model/MulticastDefinition.java    | 25 +++++++++++
 .../apache/camel/model/OnCompletionDefinition.java |  7 +---
 .../apache/camel/model/PollEnrichDefinition.java   | 10 +++++
 .../apache/camel/model/ProcessorDefinition.java    | 37 +++++++++++++++-
 .../camel/model/RecipientListDefinition.java       | 23 ++++++++++
 .../org/apache/camel/model/RouteDefinition.java    | 16 +++++--
 .../org/apache/camel/model/SplitDefinition.java    | 23 ++++++++++
 .../org/apache/camel/model/WireTapDefinition.java  | 26 ++++++++++++
 .../camel/processor/IdempotentConsumerDslTest.java |  7 +++-
 .../camel/processor/SplitterOnPrepareTest.java     |  2 +-
 .../AggregateEagerCheckCompletionTest.java         |  5 ++-
 18 files changed, 265 insertions(+), 13 deletions(-)

diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 2fe4b5c..b7a1146 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -730,7 +731,10 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
 
     /**
      * Sets the AggregationStrategy to use with a fluent builder.
+     *
+     * @deprecated use {@link #aggregationStrategy()}
      */
+    @Deprecated
     public AggregationStrategyClause<AggregateDefinition> strategy() {
         return aggregationStrategy();
     }
@@ -740,7 +744,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
      *
      * @param aggregationStrategy  the aggregate strategy to use
      * @return the builder
+     * @deprecated use {@link #aggregationStrategy(AggregationStrategy)}
      */
+    @Deprecated
     public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
         return aggregationStrategy(aggregationStrategy);
     }
@@ -759,6 +765,17 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     /**
      * Sets the aggregate strategy to use
      *
+     * @param aggregationStrategy  the aggregate strategy to use
+     * @return the builder
+     */
+    public AggregateDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
+     * Sets the aggregate strategy to use
+     *
      * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
      * @return the builder
      */
@@ -802,6 +819,19 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Sets the custom aggregate repository to use.
+     * <p/>
+     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
+     *
+     * @param aggregationRepository  the aggregate repository to use
+     * @return the builder
+     */
+    public AggregateDefinition aggregationRepository(Supplier<AggregationRepository> aggregationRepository) {
+        setAggregationRepository(aggregationRepository.get());
+        return this;
+    }
+
+    /**
      * Sets the custom aggregate repository to use
      * <p/>
      * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
@@ -951,6 +981,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
      * background thread is created to check for the completion for every aggregator.
      * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
      */
+    public AggregateDefinition timeoutCheckerExecutorService(Supplier<ScheduledExecutorService> executorService) {
+        setTimeoutCheckerExecutorService(executorService.get());
+        return this;
+    }
+
+    /**
+     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
+     * background thread is created to check for the completion for every aggregator.
+     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
+     */
     public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
         setTimeoutCheckerExecutorServiceRef(executorServiceRef);
         return this;
@@ -965,6 +1005,15 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         return this;
     }
 
+    /**
+     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
+     * this aggregator.
+     */
+    public AggregateDefinition aggregateController(Supplier<AggregateController> aggregateController) {
+        setAggregateController(aggregateController.get());
+        return this;
+    }
+
     // Section - Methods from ExpressionNode
     // Needed to copy methods from ExpressionNode here so that I could specify the
     // correlation expression as optional in JAXB
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java
index acba75a..e447aaa 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java
@@ -98,7 +98,9 @@ public class CatchDefinition extends ProcessorDefinition<CatchDefinition> implem
      *
      * @param exceptionClasses  a list of the exception classes
      * @return the builder
+     * @deprecated use {@link #exception(Class[])}
      */
+    @Deprecated
     public CatchDefinition exceptionClasses(List<Class<? extends Throwable>> exceptionClasses) {
         setExceptionClasses(exceptionClasses);
         return this;
@@ -139,7 +141,9 @@ public class CatchDefinition extends ProcessorDefinition<CatchDefinition> implem
      *
      * @param exception  the exception of class
      * @return the builder
+     * @deprecated use {@link #exception(Class[])}
      */
+    @Deprecated
     public CatchDefinition exceptionClasses(Class<? extends Throwable> exception) {
         List<Class<? extends Throwable>> list = getExceptionClasses();
         list.add(exception);
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
index 54240df..09674d4 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.function.Supplier;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -144,6 +145,15 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
      * To use a custom {@link AggregationStrategy} instead of the default implementation.
      * Notice you cannot use both custom aggregation strategy and configure data at the same time.
      */
+    public ClaimCheckDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
+     * To use a custom {@link AggregationStrategy} instead of the default implementation.
+     * Notice you cannot use both custom aggregation strategy and configure data at the same time.
+     */
     public ClaimCheckDefinition aggregationStrategyRef(String aggregationStrategyRef) {
         setAggregationStrategyRef(aggregationStrategyRef);
         return this;
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 498349f..156ffd1 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.function.Supplier;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -88,6 +89,15 @@ public class EnrichDefinition extends ExpressionNode {
     }
 
     /**
+     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
+     */
+    public EnrichDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
      * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
      * By default Camel will use the reply from the external service as outgoing message.
      */
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
index b836785..843dca6 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
@@ -17,6 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import org.apache.camel.ExecutorServiceAware;
 
@@ -36,6 +37,16 @@ public interface ExecutorServiceAwareDefinition<Type extends ProcessorDefinition
     /**
      * Setting the executor service for executing
      *
+     * @param executorService the executor service
+     * @return the builder
+     */
+    default Type executorService(Supplier<ExecutorService> executorService) {
+        return executorService(executorService.get());
+    }
+
+    /**
+     * Setting the executor service for executing
+     *
      * @param executorServiceRef reference for a {@link java.util.concurrent.ExecutorService}
      *                           to lookup in the {@link org.apache.camel.spi.Registry}
      * @return the builder
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
index b180c00..d293234 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.function.Supplier;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -97,6 +98,17 @@ public class IdempotentConsumerDefinition extends OutputExpressionNode {
     }
 
     /**
+     * Sets the message id repository for the idempotent consumer
+     *
+     * @param idempotentRepository the repository instance of idempotent
+     * @return builder
+     */
+    public IdempotentConsumerDefinition messageIdRepository(Supplier<IdempotentRepository> idempotentRepository) {
+        setMessageIdRepository(idempotentRepository.get());
+        return this;
+    }
+
+    /**
      * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
      * is complete. Eager is default enabled.
      *
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
index 24b4bf8..c8e1b0a 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 @XmlRootElement(name = "log")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class LogDefinition extends NoOutputDefinition<LogDefinition> {
+
     @XmlAttribute(required = true)
     private String message;
     @XmlAttribute @Metadata(defaultValue = "INFO")
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 7f2b9af..9f3f993 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -17,6 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -110,6 +111,17 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     }
 
     /**
+     * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
+     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
+     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
+     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
+     */
+    public MulticastDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
      * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
      * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
      * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
@@ -269,6 +281,19 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
      * This can be used to deep-clone messages that should be send, or any custom logic needed before
      * the exchange is send.
      *
+     * @param onPrepare the processor
+     * @return the builder
+     */
+    public MulticastDefinition onPrepare(Supplier<Processor> onPrepare) {
+        setOnPrepare(onPrepare.get());
+        return this;
+    }
+
+    /**
+     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
+     * This can be used to deep-clone messages that should be send, or any custom logic needed before
+     * the exchange is send.
+     *
      * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
      * @return the builder
      */
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index fafe59e..ba718ae 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -107,12 +107,7 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
      * @param definition the parent definition that is the route
      */
     public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
-        for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
-            ProcessorDefinition<?> out = it.next();
-            if (out instanceof OnCompletionDefinition) {
-                it.remove();
-            }
-        }
+        definition.getOutputs().removeIf(out -> out instanceof OnCompletionDefinition);
     }
 
     @Override
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 6446d5b..f3f35c1 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.function.Supplier;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -104,6 +105,15 @@ public class PollEnrichDefinition extends ExpressionNode {
     }
 
     /**
+     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
+     */
+    public PollEnrichDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
      * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
      * By default Camel will use the reply from the external service as outgoing message.
      */
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 73e45c8..ea41d88 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2253,6 +2253,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
 
     /**
      * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
+     * Adds the custom processor to this destination which could be a final
+     * destination, or could be a transformation in a pipeline
+     *
+     * @param processor  the custom {@link Processor}
+     * @return the builder
+     */
+    public Type process(Supplier<Processor> processor) {
+        return process(processor.get());
+    }
+
+    /**
+     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
      * Adds the custom processor reference to this destination which could be a final
      * destination, or could be a transformation in a pipeline
      *
@@ -2304,6 +2316,17 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
      *
      * @param bean  the bean to invoke, or a reference to a bean if the type is a String
+     * @return the builder
+     */
+    public Type bean(Supplier<Object> bean) {
+        return bean(bean.get());
+    }
+
+    /**
+     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
+     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
+     *
+     * @param bean  the bean to invoke, or a reference to a bean if the type is a String
      * @param method  the method name to invoke on the bean (can be used to avoid ambiguity)
      * @return the builder
      */
@@ -2318,7 +2341,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
         addOutput(answer);
         return asType();
     }
-    
+
+    /**
+     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
+     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
+     *
+     * @param bean  the bean to invoke, or a reference to a bean if the type is a String
+     * @param method  the method name to invoke on the bean (can be used to avoid ambiguity)
+     * @return the builder
+     */
+    public Type bean(Supplier<Object> bean, String method) {
+        return bean(bean.get(), method);
+    }
+
     /**
      * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
      * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index d396deb..3f3da08 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -17,6 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -134,6 +135,15 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     }
 
     /**
+     * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
+     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
+     */
+    public RecipientListDefinition<Type> aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
      * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
      * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
      */
@@ -288,6 +298,19 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     }
 
     /**
+     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send.
+     * This can be used to deep-clone messages that should be send, or any custom logic needed before
+     * the exchange is send.
+     *
+     * @param onPrepare the processor
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> onPrepare(Supplier<Processor> onPrepare) {
+        setOnPrepare(onPrepare.get());
+        return this;
+    }
+
+    /**
      * Sets the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send using a fluent buidler.
      */
     public ProcessClause<RecipientListDefinition<Type>> onPrepare() {
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
index 680bce0..13c2a0e 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
@@ -17,8 +17,10 @@
 package org.apache.camel.model;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -452,13 +454,21 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> implem
         if (routePolicies == null) {
             routePolicies = new ArrayList<>();
         }
-        for (RoutePolicy policy : policies) {
-            routePolicies.add(policy);
-        }
+        routePolicies.addAll(Arrays.asList(policies));
         return this;
     }
 
     /**
+     * Configures route policy for this route
+     *
+     * @param policy route policy
+     * @return the builder
+     */
+    public RouteDefinition routePolicy(Supplier<RoutePolicy> policy) {
+        return routePolicy(policy.get());
+    }
+
+    /**
      * Configures a route policy for this route
      *
      * @param routePolicyRef reference to a {@link RoutePolicy} to lookup and use.
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index b31fc8b..4322ec2 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -17,6 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -107,6 +108,15 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
     }
 
     /**
+     * Sets the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
+     * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
+     */
+    public SplitDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy.get());
+        return this;
+    }
+
+    /**
      * Sets a reference to the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
      * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
      */
@@ -264,6 +274,19 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
      * This can be used to deep-clone messages that should be send, or any custom logic needed before
      * the exchange is send.
      *
+     * @param onPrepare the processor
+     * @return the builder
+     */
+    public SplitDefinition onPrepare(Supplier<Processor> onPrepare) {
+        setOnPrepare(onPrepare.get());
+        return this;
+    }
+
+    /**
+     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
+     * This can be used to deep-clone messages that should be send, or any custom logic needed before
+     * the exchange is send.
+     *
      * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
      * @return the builder
      */
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index dfd85d4..aca29d7 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -19,6 +19,7 @@ package org.apache.camel.model;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -204,6 +205,18 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
     }
 
     /**
+     * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}
+     *
+     * @param processor  processor preparing the new exchange to send
+     * @return the builder
+     * @see #newExchangeHeader(String, org.apache.camel.Expression)
+     */
+    public WireTapDefinition<Type> newExchange(Supplier<Processor> processor) {
+        setNewExchangeProcessor(processor.get());
+        return this;
+    }
+
+    /**
      * Sets a header on the <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}.
      * <p/>
      * Use this together with the {@link #newExchangeBody(org.apache.camel.Expression)} or {@link #newExchange(org.apache.camel.Processor)}
@@ -236,6 +249,19 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
      * This can be used to deep-clone messages that should be send, or any custom logic needed before
      * the exchange is send.
      *
+     * @param onPrepare the processor
+     * @return the builder
+     */
+    public WireTapDefinition<Type> onPrepare(Supplier<Processor> onPrepare) {
+        setOnPrepare(onPrepare.get());
+        return this;
+    }
+
+    /**
+     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
+     * This can be used to deep-clone messages that should be send, or any custom logic needed before
+     * the exchange is send.
+     *
      * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
      * @return the builder
      */
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
index 9a5ffdd..8862af3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.junit.Test;
 
@@ -39,6 +40,10 @@ public class IdempotentConsumerDslTest extends ContextTestSupport {
         mock.assertIsSatisfied();
     }
 
+    public IdempotentRepository createRepo() {
+        return MemoryIdempotentRepository.memoryIdempotentRepository(200);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -47,7 +52,7 @@ public class IdempotentConsumerDslTest extends ContextTestSupport {
                 from("direct:start")
                     .idempotentConsumer()
                         .message(m -> m.getHeader("messageId"))
-                        .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
+                        .messageIdRepository(() -> createRepo())
                     .to("mock:result");
             }
         };
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java
index d1b1c29..c2c79a2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java
@@ -49,7 +49,7 @@ public class SplitterOnPrepareTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .split(body()).onPrepare(new FixNamePrepare())
+                    .split(body()).onPrepare(FixNamePrepare::new)
                         .to("direct:a");
 
                 from("direct:a").process(new ProcessorA()).to("mock:a");
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
index 15c5659..8991989 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.concurrent.ForkJoinPool;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
@@ -56,7 +58,8 @@ public class AggregateEagerCheckCompletionTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                    .aggregate(header("id"))
+                        .aggregationStrategy(BodyInAggregatingStrategy::new)
                         .completionPredicate(body().isEqualTo("A+B+END"))
                         .to("mock:result");
             }