You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 10:15:41 UTC
[camel] branch master updated: CAMEL-10845: Java DSL add Java8
Supplier support
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e38f2a0 CAMEL-10845: Java DSL add Java8 Supplier support
e38f2a0 is described below
commit e38f2a0751445478a2a71620407a723ffab9ab04
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 12:06:50 2019 +0200
CAMEL-10845: Java DSL add Java8 Supplier support
---
.../apache/camel/model/AggregateDefinition.java | 49 ++++++++++++++++++++++
.../org/apache/camel/model/CatchDefinition.java | 4 ++
.../apache/camel/model/ClaimCheckDefinition.java | 10 +++++
.../org/apache/camel/model/EnrichDefinition.java | 10 +++++
.../model/ExecutorServiceAwareDefinition.java | 11 +++++
.../camel/model/IdempotentConsumerDefinition.java | 12 ++++++
.../java/org/apache/camel/model/LogDefinition.java | 1 +
.../apache/camel/model/MulticastDefinition.java | 25 +++++++++++
.../apache/camel/model/OnCompletionDefinition.java | 7 +---
.../apache/camel/model/PollEnrichDefinition.java | 10 +++++
.../apache/camel/model/ProcessorDefinition.java | 37 +++++++++++++++-
.../camel/model/RecipientListDefinition.java | 23 ++++++++++
.../org/apache/camel/model/RouteDefinition.java | 16 +++++--
.../org/apache/camel/model/SplitDefinition.java | 23 ++++++++++
.../org/apache/camel/model/WireTapDefinition.java | 26 ++++++++++++
.../camel/processor/IdempotentConsumerDslTest.java | 7 +++-
.../camel/processor/SplitterOnPrepareTest.java | 2 +-
.../AggregateEagerCheckCompletionTest.java | 5 ++-
18 files changed, 265 insertions(+), 13 deletions(-)
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 2fe4b5c..b7a1146 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -730,7 +731,10 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
/**
* Sets the AggregationStrategy to use with a fluent builder.
+ *
+ * @deprecated use {@link #aggregationStrategy()}
*/
+ @Deprecated
public AggregationStrategyClause<AggregateDefinition> strategy() {
return aggregationStrategy();
}
@@ -740,7 +744,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
*
* @param aggregationStrategy the aggregate strategy to use
* @return the builder
+ * @deprecated use {@link #aggregationStrategy(AggregationStrategy)}
*/
+ @Deprecated
public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
return aggregationStrategy(aggregationStrategy);
}
@@ -759,6 +765,17 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
/**
* Sets the aggregate strategy to use
*
+ * @param aggregationStrategy the aggregate strategy to use
+ * @return the builder
+ */
+ public AggregateDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
+ * Sets the aggregate strategy to use
+ *
* @param aggregationStrategyRef reference to the strategy to lookup in the registry
* @return the builder
*/
@@ -802,6 +819,19 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
}
/**
+ * Sets the custom aggregate repository to use.
+ * <p/>
+ * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
+ *
+ * @param aggregationRepository the aggregate repository to use
+ * @return the builder
+ */
+ public AggregateDefinition aggregationRepository(Supplier<AggregationRepository> aggregationRepository) {
+ setAggregationRepository(aggregationRepository.get());
+ return this;
+ }
+
+ /**
* Sets the custom aggregate repository to use
* <p/>
* Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
@@ -951,6 +981,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
* background thread is created to check for the completion for every aggregator.
* Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
*/
+ public AggregateDefinition timeoutCheckerExecutorService(Supplier<ScheduledExecutorService> executorService) {
+ setTimeoutCheckerExecutorService(executorService.get());
+ return this;
+ }
+
+ /**
+ * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
+ * background thread is created to check for the completion for every aggregator.
+ * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
+ */
public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
setTimeoutCheckerExecutorServiceRef(executorServiceRef);
return this;
@@ -965,6 +1005,15 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
return this;
}
+ /**
+ * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
+ * this aggregator.
+ */
+ public AggregateDefinition aggregateController(Supplier<AggregateController> aggregateController) {
+ setAggregateController(aggregateController.get());
+ return this;
+ }
+
// Section - Methods from ExpressionNode
// Needed to copy methods from ExpressionNode here so that I could specify the
// correlation expression as optional in JAXB
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java
index acba75a..e447aaa 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/CatchDefinition.java
@@ -98,7 +98,9 @@ public class CatchDefinition extends ProcessorDefinition<CatchDefinition> implem
*
* @param exceptionClasses a list of the exception classes
* @return the builder
+ * @deprecated use {@link #exception(Class[])}
*/
+ @Deprecated
public CatchDefinition exceptionClasses(List<Class<? extends Throwable>> exceptionClasses) {
setExceptionClasses(exceptionClasses);
return this;
@@ -139,7 +141,9 @@ public class CatchDefinition extends ProcessorDefinition<CatchDefinition> implem
*
* @param exception the exception of class
* @return the builder
+ * @deprecated use {@link #exception(Class[])}
*/
+ @Deprecated
public CatchDefinition exceptionClasses(Class<? extends Throwable> exception) {
List<Class<? extends Throwable>> list = getExceptionClasses();
list.add(exception);
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
index 54240df..09674d4 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -144,6 +145,15 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
* To use a custom {@link AggregationStrategy} instead of the default implementation.
* Notice you cannot use both custom aggregation strategy and configure data at the same time.
*/
+ public ClaimCheckDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
+ * To use a custom {@link AggregationStrategy} instead of the default implementation.
+ * Notice you cannot use both custom aggregation strategy and configure data at the same time.
+ */
public ClaimCheckDefinition aggregationStrategyRef(String aggregationStrategyRef) {
setAggregationStrategyRef(aggregationStrategyRef);
return this;
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 498349f..156ffd1 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -88,6 +89,15 @@ public class EnrichDefinition extends ExpressionNode {
}
/**
+ * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
+ */
+ public EnrichDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
* Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
* By default Camel will use the reply from the external service as outgoing message.
*/
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
index b836785..843dca6 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/ExecutorServiceAwareDefinition.java
@@ -17,6 +17,7 @@
package org.apache.camel.model;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.apache.camel.ExecutorServiceAware;
@@ -36,6 +37,16 @@ public interface ExecutorServiceAwareDefinition<Type extends ProcessorDefinition
/**
* Setting the executor service for executing
*
+ * @param executorService the executor service
+ * @return the builder
+ */
+ default Type executorService(Supplier<ExecutorService> executorService) {
+ return executorService(executorService.get());
+ }
+
+ /**
+ * Setting the executor service for executing
+ *
* @param executorServiceRef reference for a {@link java.util.concurrent.ExecutorService}
* to lookup in the {@link org.apache.camel.spi.Registry}
* @return the builder
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
index b180c00..d293234 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -97,6 +98,17 @@ public class IdempotentConsumerDefinition extends OutputExpressionNode {
}
/**
+ * Sets the message id repository for the idempotent consumer
+ *
+ * @param idempotentRepository the repository instance of idempotent
+ * @return builder
+ */
+ public IdempotentConsumerDefinition messageIdRepository(Supplier<IdempotentRepository> idempotentRepository) {
+ setMessageIdRepository(idempotentRepository.get());
+ return this;
+ }
+
+ /**
* Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
* is complete. Eager is default enabled.
*
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
index 24b4bf8..c8e1b0a 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
@XmlRootElement(name = "log")
@XmlAccessorType(XmlAccessType.FIELD)
public class LogDefinition extends NoOutputDefinition<LogDefinition> {
+
@XmlAttribute(required = true)
private String message;
@XmlAttribute @Metadata(defaultValue = "INFO")
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 7f2b9af..9f3f993 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -17,6 +17,7 @@
package org.apache.camel.model;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -110,6 +111,17 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
}
/**
+ * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
+ * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
+ * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
+ * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
+ */
+ public MulticastDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
* Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
* By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
* If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
@@ -269,6 +281,19 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
* This can be used to deep-clone messages that should be send, or any custom logic needed before
* the exchange is send.
*
+ * @param onPrepare the processor
+ * @return the builder
+ */
+ public MulticastDefinition onPrepare(Supplier<Processor> onPrepare) {
+ setOnPrepare(onPrepare.get());
+ return this;
+ }
+
+ /**
+ * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
+ * This can be used to deep-clone messages that should be send, or any custom logic needed before
+ * the exchange is send.
+ *
* @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
* @return the builder
*/
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index fafe59e..ba718ae 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -107,12 +107,7 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
* @param definition the parent definition that is the route
*/
public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
- for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
- ProcessorDefinition<?> out = it.next();
- if (out instanceof OnCompletionDefinition) {
- it.remove();
- }
- }
+ definition.getOutputs().removeIf(out -> out instanceof OnCompletionDefinition);
}
@Override
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 6446d5b..f3f35c1 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -104,6 +105,15 @@ public class PollEnrichDefinition extends ExpressionNode {
}
/**
+ * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
+ */
+ public PollEnrichDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
* Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
* By default Camel will use the reply from the external service as outgoing message.
*/
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 73e45c8..ea41d88 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2253,6 +2253,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
/**
* <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
+ * Adds the custom processor to this destination which could be a final
+ * destination, or could be a transformation in a pipeline
+ *
+ * @param processor the custom {@link Processor}
+ * @return the builder
+ */
+ public Type process(Supplier<Processor> processor) {
+ return process(processor.get());
+ }
+
+ /**
+ * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
* Adds the custom processor reference to this destination which could be a final
* destination, or could be a transformation in a pipeline
*
@@ -2304,6 +2316,17 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
*
* @param bean the bean to invoke, or a reference to a bean if the type is a String
+ * @return the builder
+ */
+ public Type bean(Supplier<Object> bean) {
+ return bean(bean.get());
+ }
+
+ /**
+ * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
+ * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
+ *
+ * @param bean the bean to invoke, or a reference to a bean if the type is a String
* @param method the method name to invoke on the bean (can be used to avoid ambiguity)
* @return the builder
*/
@@ -2318,7 +2341,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
addOutput(answer);
return asType();
}
-
+
+ /**
+ * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
+ * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
+ *
+ * @param bean the bean to invoke, or a reference to a bean if the type is a String
+ * @param method the method name to invoke on the bean (can be used to avoid ambiguity)
+ * @return the builder
+ */
+ public Type bean(Supplier<Object> bean, String method) {
+ return bean(bean.get(), method);
+ }
+
/**
* <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
* Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index d396deb..3f3da08 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -17,6 +17,7 @@
package org.apache.camel.model;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -134,6 +135,15 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
}
/**
+ * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
+ * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
+ */
+ public RecipientListDefinition<Type> aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
* Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
* By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
*/
@@ -288,6 +298,19 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
}
/**
+ * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send.
+ * This can be used to deep-clone messages that should be send, or any custom logic needed before
+ * the exchange is send.
+ *
+ * @param onPrepare the processor
+ * @return the builder
+ */
+ public RecipientListDefinition<Type> onPrepare(Supplier<Processor> onPrepare) {
+ setOnPrepare(onPrepare.get());
+ return this;
+ }
+
+ /**
* Sets the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send using a fluent buidler.
*/
public ProcessClause<RecipientListDefinition<Type>> onPrepare() {
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
index 680bce0..13c2a0e 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java
@@ -17,8 +17,10 @@
package org.apache.camel.model;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -452,13 +454,21 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> implem
if (routePolicies == null) {
routePolicies = new ArrayList<>();
}
- for (RoutePolicy policy : policies) {
- routePolicies.add(policy);
- }
+ routePolicies.addAll(Arrays.asList(policies));
return this;
}
/**
+ * Configures route policy for this route
+ *
+ * @param policy route policy
+ * @return the builder
+ */
+ public RouteDefinition routePolicy(Supplier<RoutePolicy> policy) {
+ return routePolicy(policy.get());
+ }
+
+ /**
* Configures a route policy for this route
*
* @param routePolicyRef reference to a {@link RoutePolicy} to lookup and use.
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index b31fc8b..4322ec2 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -17,6 +17,7 @@
package org.apache.camel.model;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -107,6 +108,15 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
}
/**
+ * Sets the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
+ * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
+ */
+ public SplitDefinition aggregationStrategy(Supplier<AggregationStrategy> aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy.get());
+ return this;
+ }
+
+ /**
* Sets a reference to the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
* By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
*/
@@ -264,6 +274,19 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer
* This can be used to deep-clone messages that should be send, or any custom logic needed before
* the exchange is send.
*
+ * @param onPrepare the processor
+ * @return the builder
+ */
+ public SplitDefinition onPrepare(Supplier<Processor> onPrepare) {
+ setOnPrepare(onPrepare.get());
+ return this;
+ }
+
+ /**
+ * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
+ * This can be used to deep-clone messages that should be send, or any custom logic needed before
+ * the exchange is send.
+ *
* @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
* @return the builder
*/
diff --git a/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index dfd85d4..aca29d7 100644
--- a/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/core/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -19,6 +19,7 @@ package org.apache.camel.model;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -204,6 +205,18 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
}
/**
+ * Sends a <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}
+ *
+ * @param processor processor preparing the new exchange to send
+ * @return the builder
+ * @see #newExchangeHeader(String, org.apache.camel.Expression)
+ */
+ public WireTapDefinition<Type> newExchange(Supplier<Processor> processor) {
+ setNewExchangeProcessor(processor.get());
+ return this;
+ }
+
+ /**
* Sets a header on the <i>new</i> Exchange, instead of tapping an existing, using {@link ExchangePattern#InOnly}.
* <p/>
* Use this together with the {@link #newExchangeBody(org.apache.camel.Expression)} or {@link #newExchange(org.apache.camel.Processor)}
@@ -236,6 +249,19 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
* This can be used to deep-clone messages that should be send, or any custom logic needed before
* the exchange is send.
*
+ * @param onPrepare the processor
+ * @return the builder
+ */
+ public WireTapDefinition<Type> onPrepare(Supplier<Processor> onPrepare) {
+ setOnPrepare(onPrepare.get());
+ return this;
+ }
+
+ /**
+ * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
+ * This can be used to deep-clone messages that should be send, or any custom logic needed before
+ * the exchange is send.
+ *
* @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
* @return the builder
*/
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
index 9a5ffdd..8862af3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
import org.junit.Test;
@@ -39,6 +40,10 @@ public class IdempotentConsumerDslTest extends ContextTestSupport {
mock.assertIsSatisfied();
}
+ public IdempotentRepository createRepo() {
+ return MemoryIdempotentRepository.memoryIdempotentRepository(200);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -47,7 +52,7 @@ public class IdempotentConsumerDslTest extends ContextTestSupport {
from("direct:start")
.idempotentConsumer()
.message(m -> m.getHeader("messageId"))
- .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
+ .messageIdRepository(() -> createRepo())
.to("mock:result");
}
};
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java
index d1b1c29..c2c79a2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterOnPrepareTest.java
@@ -49,7 +49,7 @@ public class SplitterOnPrepareTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .split(body()).onPrepare(new FixNamePrepare())
+ .split(body()).onPrepare(FixNamePrepare::new)
.to("direct:a");
from("direct:a").process(new ProcessorA()).to("mock:a");
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
index 15c5659..8991989 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateEagerCheckCompletionTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.ForkJoinPool;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
@@ -56,7 +58,8 @@ public class AggregateEagerCheckCompletionTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .aggregate(header("id"))
+ .aggregationStrategy(BodyInAggregatingStrategy::new)
.completionPredicate(body().isEqualTo("A+B+END"))
.to("mock:result");
}