You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2017/02/16 19:01:56 UTC
Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8
Hi Luca
This happens also for me. There is a little secret that if you change
the Message API then the Scala DSL needs to be changed accordingly.
The camel-scala can not be compiled now.
On Thu, Feb 16, 2017 at 6:10 PM, <lb...@apache.org> wrote:
> Repository: camel
> Updated Branches:
> refs/heads/master 8e0e3083e -> 8bc8484b1
>
>
> CAMEL-10724: Improve Java DSL support for Java 8
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b
>
> Branch: refs/heads/master
> Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
> Parents: 8e0e308
> Author: lburgazzoli <lb...@gmail.com>
> Authored: Wed Jan 18 18:09:08 2017 +0100
> Committer: lburgazzoli <lb...@gmail.com>
> Committed: Thu Feb 16 18:10:22 2017 +0100
>
> ----------------------------------------------------------------------
> .../src/main/java/org/apache/camel/Message.java | 16 ++++-
> .../org/apache/camel/impl/DefaultMessage.java | 31 +++++++++
> .../apache/camel/model/AggregateDefinition.java | 20 ++++++
> .../model/IdempotentConsumerDefinition.java | 9 +--
> .../apache/camel/model/MulticastDefinition.java | 26 +++++++
> .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
> .../org/apache/camel/util/ExchangeHelper.java | 18 +++++
> .../apache/camel/util/function/Suppliers.java | 43 ++++++++++++
> .../apache/camel/impl/DefaultExchangeTest.java | 5 ++
> .../camel/processor/DynamicRouter4Test.java | 58 ++++++++++++++++
> .../processor/IdempotentConsumerDslTest.java | 53 ++++++++++++++
> .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
> .../camel/processor/MulticastDslTest.java | 69 +++++++++++++++++++
> .../camel/processor/RoutingSlipDslTest.java | 49 +++++++++++++
> .../camel/processor/ThrottlerDslTest.java | 72 ++++++++++++++++++++
> .../processor/aggregator/AggregateDslTest.java | 41 +++++++++--
> 16 files changed, 582 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
> index 8cfeae0..a0e9c4d 100644
> --- a/camel-core/src/main/java/org/apache/camel/Message.java
> +++ b/camel-core/src/main/java/org/apache/camel/Message.java
> @@ -18,7 +18,7 @@ package org.apache.camel;
>
> import java.util.Map;
> import java.util.Set;
> -
> +import java.util.function.Supplier;
> import javax.activation.DataHandler;
>
> /**
> @@ -88,6 +88,13 @@ public interface Message {
> Object getHeader(String name, Object defaultValue);
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + */
> + Object getHeader(String name, Supplier<Object> defaultValueSupplier);
> +
> + /**
> * Returns a header associated with this message by name and specifying the
> * type required
> *
> @@ -112,6 +119,13 @@ public interface Message {
> <T> T getHeader(String name, Object defaultValue, Class<T> type);
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + */
> + <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
> +
> + /**
> * Sets a header on the message
> *
> * @param name of the header
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> index 848586a..1e49766 100644
> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> @@ -20,6 +20,7 @@ import java.util.HashSet;
> import java.util.LinkedHashMap;
> import java.util.Map;
> import java.util.Set;
> +import java.util.function.Supplier;
> import javax.activation.DataHandler;
>
> import org.apache.camel.Attachment;
> @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
> return answer != null ? answer : defaultValue;
> }
>
> + public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
> + Object answer = getHeaders().get(name);
> + return answer != null ? answer : defaultValueSupplier.get();
> + }
> +
> @SuppressWarnings("unchecked")
> public <T> T getHeader(String name, Class<T> type) {
> Object value = getHeader(name);
> @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
> }
> }
>
> + @SuppressWarnings("unchecked")
> + public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
> + Object value = getHeader(name, defaultValueSupplier);
> + if (value == null) {
> + // lets avoid NullPointerException when converting to boolean for null values
> + if (boolean.class.isAssignableFrom(type)) {
> + return (T) Boolean.FALSE;
> + }
> + return null;
> + }
> +
> + // eager same instance type test to avoid the overhead of invoking the type converter
> + // if already same type
> + if (type.isInstance(value)) {
> + return type.cast(value);
> + }
> +
> + Exchange e = getExchange();
> + if (e != null) {
> + return e.getContext().getTypeConverter().convertTo(type, e, value);
> + } else {
> + return type.cast(value);
> + }
> + }
> +
> public void setHeader(String name, Object value) {
> if (headers == null) {
> headers = createHeaders();
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> index ec7d396..12f0a13 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
> }
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
> + return aggregationStrategy(aggregationStrategy);
> + }
> +
> + /**
> * Sets the aggregate strategy to use
> *
> * @param aggregationStrategy the aggregate strategy to use
> @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
> }
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public AggregateDefinition completion(@AsPredicate Predicate predicate) {
> + return completionPredicate(predicate);
> + }
> +
> + /**
> * Indicates to complete all current aggregated exchanges when the context is stopped
> */
> public AggregateDefinition forceCompletionOnStop() {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> index 256394d..9a58704 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
> public Processor createProcessor(RouteContext routeContext) throws Exception {
> Processor childProcessor = this.createChildProcessor(routeContext, true);
>
> - IdempotentRepository<String> idempotentRepository =
> - (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
> + IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
> ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
>
> Expression expression = getExpression().createExpression(routeContext);
> @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
> boolean eager = getEager() == null || getEager();
> boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
> boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
> +
> // these boolean should be false by default
> boolean completionEager = getCompletionEager() != null && getCompletionEager();
>
> @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
> * @param routeContext route context
> * @return the repository
> */
> - protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
> + @SuppressWarnings("unchecked")
> + protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
> if (messageIdRepositoryRef != null) {
> idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
> }
> - return idempotentRepository;
> + return (IdempotentRepository<T>)idempotentRepository;
> }
> }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> index 7bff217..37efcc6 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
>
> import org.apache.camel.CamelContextAware;
> import org.apache.camel.Processor;
> +import org.apache.camel.builder.AggregationStrategyClause;
> +import org.apache.camel.builder.ProcessClause;
> import org.apache.camel.processor.MulticastProcessor;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
> @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
> // -------------------------------------------------------------------------
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
> + AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
> + setAggregationStrategy(clause);
> + return clause;
> + }
> +
> + /**
> * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
> * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
> * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
> @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
> }
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public ProcessClause<MulticastDefinition> onPrepare() {
> + ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
> + setOnPrepare(clause);
> + return clause;
> + }
> +
> + /**
> * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
> * This can be used to deep-clone messages that should be send, or any custom logic needed before
> * the exchange is send.
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> index 21fbe2e..c40a0bd 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> @@ -28,6 +28,7 @@ import java.util.Map;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicInteger;
> +import java.util.function.Supplier;
> import javax.xml.bind.annotation.XmlAccessType;
> import javax.xml.bind.annotation.XmlAccessorType;
> import javax.xml.bind.annotation.XmlAnyAttribute;
> @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
> import org.apache.camel.spi.LifecycleStrategy;
> import org.apache.camel.spi.Policy;
> import org.apache.camel.spi.RouteContext;
> +import org.apache.camel.support.ExpressionAdapter;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
> }
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
> + IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
> + addOutput(answer);
> +
> + return ExpressionClause.createAndSetExpression(answer);
> + }
> +
> + /**
> * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
> * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
> * to avoid duplicate messages
> @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
> */
> public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
> AggregateDefinition answer = new AggregateDefinition();
> - ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
> + ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
> answer.setExpression(clause);
> answer.setAggregationStrategy(aggregationStrategy);
> addOutput(answer);
> @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
> }
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public ExpressionClause<ThrottleDefinition> throttle() {
> + ThrottleDefinition answer = new ThrottleDefinition();
> + addOutput(answer);
> +
> + return ExpressionClause.createAndSetExpression(answer);
> + }
> +
> + /**
> * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
> * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
> * or that we don't exceed an agreed SLA with some external service.
> @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
> }
>
> /**
> + * TODO: document
> + * Note: this is experimental and subject to changes in future releases.
> + *
> + * @return the builder
> + */
> + public ExpressionClause<LoopDefinition> loopDoWhile() {
> + LoopDefinition loop = new LoopDefinition();
> + loop.setDoWhile(true);
> +
> + addOutput(loop);
> +
> + return ExpressionClause.createAndSetExpression(loop);
> + }
> +
> + /**
> * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
> * Creates a loop allowing to process the a message a number of times and possibly process them
> * in a different way.
> @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
> }
>
> /**
> + * Adds a processor which sets the header on the IN message
> + *
> + * @param name the header name
> + * @param supplier the supplier used to set the header
> + * @return the builder
> + */
> + @SuppressWarnings("unchecked")
> + public Type setHeader(String name, final Supplier<Object> supplier) {
> + SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
> + @Override
> + public Object evaluate(Exchange exchange) {
> + return supplier.get();
> + }
> + });
> +
> + addOutput(answer);
> + return (Type) this;
> + }
> +
> + /**
> * Adds a processor which sets the header on the OUT message
> *
> * @param name the header name
> @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
> public String getLabel() {
> return "";
> }
> -
> }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> index ce3fdca..6e5967e 100644
> --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> @@ -130,6 +130,24 @@ public final class ExchangeHelper {
> }
>
> /**
> + * Gets an header or property of the correct type
> + *
> + * @param exchange the exchange
> + * @param name the name of the header or the property
> + * @param type the type
> + * @return the header or property value
> + * @throws TypeConversionException is thrown if error during type conversion
> + * @throws NoSuchHeaderException is thrown if no headers exists
> + */
> + public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
> + T answer = exchange.getIn().getHeader(name, type);
> + if (answer == null) {
> + answer = exchange.getProperty(name, type);
> + }
> + return answer;
> + }
> +
> + /**
> * Returns the mandatory inbound message body of the correct type or throws
> * an exception if it is not present
> *
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> new file mode 100644
> index 0000000..4f8f845
> --- /dev/null
> +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> @@ -0,0 +1,43 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.util.function;
> +
> +import java.util.Objects;
> +import java.util.concurrent.atomic.AtomicReference;
> +import java.util.function.Supplier;
> +
> +public final class Suppliers {
> + private Suppliers() {
> + }
> +
> + public static <T> Supplier<T> memorize(Supplier<T> supplier) {
> + final AtomicReference<T> valueHolder = new AtomicReference<>();
> + return () -> {
> + T supplied = valueHolder.get();
> + if (supplied == null) {
> + synchronized (valueHolder) {
> + supplied = valueHolder.get();
> + if (supplied == null) {
> + supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
> + valueHolder.lazySet(supplied);
> + }
> + }
> + }
> + return supplied;
> + };
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> index 43bd8ff..ee14ca0 100644
> --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
> assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
> assertEquals("123", exchange.getIn().getHeader("bar", String.class));
> assertEquals(123, exchange.getIn().getHeader("bar", 234));
> + assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
> + assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
>
> assertEquals(123, exchange.getIn().getHeader("bar", 234));
> assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
> assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
> + assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
> + assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
>
> assertEquals(234, exchange.getIn().getHeader("cheese", 234));
> assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
> + assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
> }
>
> public void testProperty() throws Exception {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> new file mode 100644
> index 0000000..4f68bc0
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> @@ -0,0 +1,58 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.Exchange;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.util.ExchangeHelper;
> +
> +public class DynamicRouter4Test extends ContextTestSupport {
> + public void testDynamicRouter() throws Exception {
> + getMockEndpoint("mock:a").expectedMessageCount(1);
> + getMockEndpoint("mock:b").expectedMessageCount(1);
> + getMockEndpoint("mock:c").expectedMessageCount(1);
> +
> + template.sendBody("direct:start-1", "Hello World");
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + @Override
> + protected RouteBuilder createRouteBuilder() throws Exception {
> + return new RouteBuilder() {
> + @Override
> + public void configure() throws Exception {
> + from("direct:start-1")
> + .dynamicRouter()
> + .exchange(DynamicRouter4Test::slip);
> + }
> + };
> + }
> +
> + public static String slip(Exchange exchange) {
> + String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
> + if (previous == null) {
> + return "mock:a,mock:b";
> + } else if ("mock://b".equals(previous)) {
> + return "mock:c";
> + }
> +
> + // no more so return null
> + return null;
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> new file mode 100644
> index 0000000..9afd3f9
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> @@ -0,0 +1,53 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
> +
> +public class IdempotentConsumerDslTest extends ContextTestSupport {
> +
> + public void testDuplicateMessages() throws Exception {
> + MockEndpoint mock = getMockEndpoint("mock:result");
> + mock.expectedBodiesReceived("one", "two", "three");
> +
> + template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> + template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
> + template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> + template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
> + template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> + template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
> +
> + mock.assertIsSatisfied();
> + }
> +
> + @Override
> + protected RouteBuilder createRouteBuilder() throws Exception {
> + return new RouteBuilder() {
> + @Override
> + public void configure() {
> + from("direct:start")
> + .idempotentConsumer()
> + .message(m -> m.getHeader("messageId"))
> + .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
> + .to("mock:result");
> + }
> + };
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> index 2ef927a..54b5f6a 100644
> --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
>
> public class LoopDoWhileTest extends ContextTestSupport {
>
> - public void testLoopDoWhile() throws Exception {
> + public void testLoopDoWhileSimple() throws Exception {
> getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
> getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>
> - template.sendBody("direct:start", "A");
> + template.sendBody("direct:simple", "A");
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + public void testLoopDoWhileFunctional() throws Exception {
> + getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
> + getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
> +
> + template.sendBody("direct:functional", "A");
>
> assertMockEndpointsSatisfied();
> }
> @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> - from("direct:start")
> + from("direct:simple")
> .loopDoWhile(simple("${body.length} <= 5"))
> .to("mock:loop")
> .transform(body().append("A"))
> .end()
> .to("mock:result");
> + from("direct:functional")
> + .loopDoWhile()
> + .body(String.class, b -> b.length() <= 5)
> + .to("mock:loop")
> + .transform()
> + .body(String.class, b -> b += "A")
> + .end()
> + .to("mock:result");
> }
> };
> }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> new file mode 100644
> index 0000000..a800e36
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> @@ -0,0 +1,69 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class MulticastDslTest extends ContextTestSupport {
> + public void testMulticastDsl() throws Exception {
> + MockEndpoint mock = getMockEndpoint("mock:result");
> + mock.expectedMessageCount(1);
> + mock.expectedHeaderReceived("onPrepare", true);
> + mock.expectedBodiesReceived(5);
> +
> + template.sendBody("direct:start", 1);
> +
> + mock.assertIsSatisfied();
> + }
> +
> + @Override
> + protected RouteBuilder createRouteBuilder() throws Exception {
> + return new RouteBuilder() {
> + @Override
> + public void configure() throws Exception {
> + from("direct:start")
> + .multicast()
> + .onPrepare()
> + .message(m -> m.setHeader("onPrepare", true))
> + .aggregationStrategy()
> + .body(Integer.class, (o, n) -> o != null ? o + n : n)
> + .to("direct:increase-by-1")
> + .to("direct:increase-by-2")
> + .end()
> + .to("mock:result");
> +
> + from("direct:increase-by-1")
> + .bean(new Increase(1));
> + from("direct:increase-by-2")
> + .bean(new Increase(2));
> + }
> + };
> + }
> +
> + public static class Increase {
> + private final int amount;
> + public Increase(int amount) {
> + this.amount = amount;
> + }
> +
> + public int add(int num) {
> + return num + amount;
> + }
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> new file mode 100644
> index 0000000..9d296b8
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> @@ -0,0 +1,49 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class RoutingSlipDslTest extends ContextTestSupport {
> +
> + public void testRoutingSlipDsl() throws Exception {
> + MockEndpoint x = getMockEndpoint("mock:x");
> + MockEndpoint y = getMockEndpoint("mock:y");
> + MockEndpoint z = getMockEndpoint("mock:z");
> +
> + x.expectedBodiesReceived("foo", "bar");
> + y.expectedBodiesReceived("foo", "bar");
> + z.expectedBodiesReceived("foo", "bar");
> +
> + template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
> + template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
> +
> + assertMockEndpointsSatisfied();
> + }
> +
> + protected RouteBuilder createRouteBuilder() {
> + return new RouteBuilder() {
> + public void configure() {
> + from("direct:a").routingSlip()
> + .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
> + .end();
> + }
> + };
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> new file mode 100644
> index 0000000..a971332
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> @@ -0,0 +1,72 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class ThrottlerDslTest extends ContextTestSupport {
> + private static final int INTERVAL = 500;
> + protected int messageCount = 9;
> +
> + protected boolean canTest() {
> + // skip test on windows as it does not run well there
> + return !isPlatform("windows");
> + }
> +
> + public void testDsl() throws Exception {
> + if (!canTest()) {
> + return;
> + }
> +
> + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
> + resultEndpoint.expectedMessageCount(messageCount);
> +
> + ExecutorService executor = Executors.newFixedThreadPool(messageCount);
> +
> + long start = System.currentTimeMillis();
> + for (int i = 0; i < messageCount; i++) {
> + executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
> + }
> +
> + // let's wait for the exchanges to arrive
> + resultEndpoint.assertIsSatisfied();
> +
> + // now assert that they have actually been throttled
> + long minimumTime = (messageCount - 1) * INTERVAL;
> + // add a little slack
> + long delta = System.currentTimeMillis() - start + 200;
> + assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
> + executor.shutdownNow();
> + }
> +
> + protected RouteBuilder createRouteBuilder() {
> + return new RouteBuilder() {
> + public void configure() {
> + from("direct:start")
> + .throttle()
> + .message(m -> m.getHeader("ThrottleCount", Integer.class))
> + .timePeriodMillis(INTERVAL)
> + .to("log:result", "mock:result");
> + }
> + };
> + }
> +}
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> index 55fd14e..f8d1db4 100644
> --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> @@ -21,20 +21,21 @@ import java.util.stream.Collectors;
> import java.util.stream.Stream;
>
> import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> -import org.apache.camel.component.mock.MockEndpoint;
>
> public class AggregateDslTest extends ContextTestSupport {
>
> public void testAggregate() throws Exception {
> - MockEndpoint mock = getMockEndpoint("mock:aggregated");
> - mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
> + getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
> + getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>
> for (int i = 0; i < 9; i++) {
> template.sendBodyAndHeader("direct:start", i, "type", i % 3);
> + template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
> }
>
> - mock.assertIsSatisfied();
> + assertMockEndpointsSatisfied();
> }
>
> @Override
> @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
> .aggregate()
> .message(m -> m.getHeader("type"))
> .strategy()
> - .body(String.class, (o, n) -> Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
> + .body(String.class, AggregateDslTest::joinString)
> .completion()
> - .body(String.class, s -> s.length() == 5)
> - .to("mock:aggregated");
> + .body(String.class, s -> s.split(",").length == 2)
> + .to("mock:aggregated");
> +
> + from("direct:start-supplier")
> + .aggregate()
> + .header("type")
> + .strategy(AggregateDslTest::joinStringStrategy)
> + .completion()
> + .body(String.class, s -> s.split(",").length == 3)
> + .to("mock:aggregated-supplier");
> }
> };
> }
> +
> + // *************************************************************************
> + // Strategies
> + // *************************************************************************
> +
> + private static String joinString(String o, String n) {
> + return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
> + }
> +
> + private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
> + newExchange.getIn().setBody(
> + joinString(
> + oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
> + newExchange.getIn().getBody(String.class))
> + );
> +
> + return newExchange;
> + }
> }
>
>
--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8
Posted by Luca Burgazzoli <lb...@gmail.com>.
Should be fixed now
---
Luca Burgazzoli
On Thu, Feb 16, 2017 at 8:01 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi Luca
>
> This happens also for me. There is a little secret that if you change
> the Message API then the Scala DSL needs to be changed accordingly.
>
> The camel-scala can not be compiled now.
>
> On Thu, Feb 16, 2017 at 6:10 PM, <lb...@apache.org> wrote:
>> Repository: camel
>> Updated Branches:
>> refs/heads/master 8e0e3083e -> 8bc8484b1
>>
>>
>> CAMEL-10724: Improve Java DSL support for Java 8
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
>> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
>> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b
>>
>> Branch: refs/heads/master
>> Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
>> Parents: 8e0e308
>> Author: lburgazzoli <lb...@gmail.com>
>> Authored: Wed Jan 18 18:09:08 2017 +0100
>> Committer: lburgazzoli <lb...@gmail.com>
>> Committed: Thu Feb 16 18:10:22 2017 +0100
>>
>> ----------------------------------------------------------------------
>> .../src/main/java/org/apache/camel/Message.java | 16 ++++-
>> .../org/apache/camel/impl/DefaultMessage.java | 31 +++++++++
>> .../apache/camel/model/AggregateDefinition.java | 20 ++++++
>> .../model/IdempotentConsumerDefinition.java | 9 +--
>> .../apache/camel/model/MulticastDefinition.java | 26 +++++++
>> .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
>> .../org/apache/camel/util/ExchangeHelper.java | 18 +++++
>> .../apache/camel/util/function/Suppliers.java | 43 ++++++++++++
>> .../apache/camel/impl/DefaultExchangeTest.java | 5 ++
>> .../camel/processor/DynamicRouter4Test.java | 58 ++++++++++++++++
>> .../processor/IdempotentConsumerDslTest.java | 53 ++++++++++++++
>> .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
>> .../camel/processor/MulticastDslTest.java | 69 +++++++++++++++++++
>> .../camel/processor/RoutingSlipDslTest.java | 49 +++++++++++++
>> .../camel/processor/ThrottlerDslTest.java | 72 ++++++++++++++++++++
>> .../processor/aggregator/AggregateDslTest.java | 41 +++++++++--
>> 16 files changed, 582 insertions(+), 17 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
>> index 8cfeae0..a0e9c4d 100644
>> --- a/camel-core/src/main/java/org/apache/camel/Message.java
>> +++ b/camel-core/src/main/java/org/apache/camel/Message.java
>> @@ -18,7 +18,7 @@ package org.apache.camel;
>>
>> import java.util.Map;
>> import java.util.Set;
>> -
>> +import java.util.function.Supplier;
>> import javax.activation.DataHandler;
>>
>> /**
>> @@ -88,6 +88,13 @@ public interface Message {
>> Object getHeader(String name, Object defaultValue);
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + */
>> + Object getHeader(String name, Supplier<Object> defaultValueSupplier);
>> +
>> + /**
>> * Returns a header associated with this message by name and specifying the
>> * type required
>> *
>> @@ -112,6 +119,13 @@ public interface Message {
>> <T> T getHeader(String name, Object defaultValue, Class<T> type);
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + */
>> + <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
>> +
>> + /**
>> * Sets a header on the message
>> *
>> * @param name of the header
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> index 848586a..1e49766 100644
>> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> @@ -20,6 +20,7 @@ import java.util.HashSet;
>> import java.util.LinkedHashMap;
>> import java.util.Map;
>> import java.util.Set;
>> +import java.util.function.Supplier;
>> import javax.activation.DataHandler;
>>
>> import org.apache.camel.Attachment;
>> @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
>> return answer != null ? answer : defaultValue;
>> }
>>
>> + public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
>> + Object answer = getHeaders().get(name);
>> + return answer != null ? answer : defaultValueSupplier.get();
>> + }
>> +
>> @SuppressWarnings("unchecked")
>> public <T> T getHeader(String name, Class<T> type) {
>> Object value = getHeader(name);
>> @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
>> }
>> }
>>
>> + @SuppressWarnings("unchecked")
>> + public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
>> + Object value = getHeader(name, defaultValueSupplier);
>> + if (value == null) {
>> + // lets avoid NullPointerException when converting to boolean for null values
>> + if (boolean.class.isAssignableFrom(type)) {
>> + return (T) Boolean.FALSE;
>> + }
>> + return null;
>> + }
>> +
>> + // eager same instance type test to avoid the overhead of invoking the type converter
>> + // if already same type
>> + if (type.isInstance(value)) {
>> + return type.cast(value);
>> + }
>> +
>> + Exchange e = getExchange();
>> + if (e != null) {
>> + return e.getContext().getTypeConverter().convertTo(type, e, value);
>> + } else {
>> + return type.cast(value);
>> + }
>> + }
>> +
>> public void setHeader(String name, Object value) {
>> if (headers == null) {
>> headers = createHeaders();
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> index ec7d396..12f0a13 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>> }
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
>> + return aggregationStrategy(aggregationStrategy);
>> + }
>> +
>> + /**
>> * Sets the aggregate strategy to use
>> *
>> * @param aggregationStrategy the aggregate strategy to use
>> @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>> }
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public AggregateDefinition completion(@AsPredicate Predicate predicate) {
>> + return completionPredicate(predicate);
>> + }
>> +
>> + /**
>> * Indicates to complete all current aggregated exchanges when the context is stopped
>> */
>> public AggregateDefinition forceCompletionOnStop() {
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> index 256394d..9a58704 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>> public Processor createProcessor(RouteContext routeContext) throws Exception {
>> Processor childProcessor = this.createChildProcessor(routeContext, true);
>>
>> - IdempotentRepository<String> idempotentRepository =
>> - (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
>> + IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
>> ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
>>
>> Expression expression = getExpression().createExpression(routeContext);
>> @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>> boolean eager = getEager() == null || getEager();
>> boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
>> boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
>> +
>> // these boolean should be false by default
>> boolean completionEager = getCompletionEager() != null && getCompletionEager();
>>
>> @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>> * @param routeContext route context
>> * @return the repository
>> */
>> - protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
>> + @SuppressWarnings("unchecked")
>> + protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
>> if (messageIdRepositoryRef != null) {
>> idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
>> }
>> - return idempotentRepository;
>> + return (IdempotentRepository<T>)idempotentRepository;
>> }
>> }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> index 7bff217..37efcc6 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
>>
>> import org.apache.camel.CamelContextAware;
>> import org.apache.camel.Processor;
>> +import org.apache.camel.builder.AggregationStrategyClause;
>> +import org.apache.camel.builder.ProcessClause;
>> import org.apache.camel.processor.MulticastProcessor;
>> import org.apache.camel.processor.aggregate.AggregationStrategy;
>> import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
>> @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>> // -------------------------------------------------------------------------
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
>> + AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
>> + setAggregationStrategy(clause);
>> + return clause;
>> + }
>> +
>> + /**
>> * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
>> * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
>> * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
>> @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>> }
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public ProcessClause<MulticastDefinition> onPrepare() {
>> + ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
>> + setOnPrepare(clause);
>> + return clause;
>> + }
>> +
>> + /**
>> * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
>> * This can be used to deep-clone messages that should be send, or any custom logic needed before
>> * the exchange is send.
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> index 21fbe2e..c40a0bd 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> @@ -28,6 +28,7 @@ import java.util.Map;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.TimeUnit;
>> import java.util.concurrent.atomic.AtomicInteger;
>> +import java.util.function.Supplier;
>> import javax.xml.bind.annotation.XmlAccessType;
>> import javax.xml.bind.annotation.XmlAccessorType;
>> import javax.xml.bind.annotation.XmlAnyAttribute;
>> @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
>> import org.apache.camel.spi.LifecycleStrategy;
>> import org.apache.camel.spi.Policy;
>> import org.apache.camel.spi.RouteContext;
>> +import org.apache.camel.support.ExpressionAdapter;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>> }
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
>> + IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
>> + addOutput(answer);
>> +
>> + return ExpressionClause.createAndSetExpression(answer);
>> + }
>> +
>> + /**
>> * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
>> * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
>> * to avoid duplicate messages
>> @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>> */
>> public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
>> AggregateDefinition answer = new AggregateDefinition();
>> - ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
>> + ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
>> answer.setExpression(clause);
>> answer.setAggregationStrategy(aggregationStrategy);
>> addOutput(answer);
>> @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>> }
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public ExpressionClause<ThrottleDefinition> throttle() {
>> + ThrottleDefinition answer = new ThrottleDefinition();
>> + addOutput(answer);
>> +
>> + return ExpressionClause.createAndSetExpression(answer);
>> + }
>> +
>> + /**
>> * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
>> * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
>> * or that we don't exceed an agreed SLA with some external service.
>> @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>> }
>>
>> /**
>> + * TODO: document
>> + * Note: this is experimental and subject to changes in future releases.
>> + *
>> + * @return the builder
>> + */
>> + public ExpressionClause<LoopDefinition> loopDoWhile() {
>> + LoopDefinition loop = new LoopDefinition();
>> + loop.setDoWhile(true);
>> +
>> + addOutput(loop);
>> +
>> + return ExpressionClause.createAndSetExpression(loop);
>> + }
>> +
>> + /**
>> * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
>> * Creates a loop allowing to process the a message a number of times and possibly process them
>> * in a different way.
>> @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>> }
>>
>> /**
>> + * Adds a processor which sets the header on the IN message
>> + *
>> + * @param name the header name
>> + * @param supplier the supplier used to set the header
>> + * @return the builder
>> + */
>> + @SuppressWarnings("unchecked")
>> + public Type setHeader(String name, final Supplier<Object> supplier) {
>> + SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
>> + @Override
>> + public Object evaluate(Exchange exchange) {
>> + return supplier.get();
>> + }
>> + });
>> +
>> + addOutput(answer);
>> + return (Type) this;
>> + }
>> +
>> + /**
>> * Adds a processor which sets the header on the OUT message
>> *
>> * @param name the header name
>> @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>> public String getLabel() {
>> return "";
>> }
>> -
>> }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> index ce3fdca..6e5967e 100644
>> --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> @@ -130,6 +130,24 @@ public final class ExchangeHelper {
>> }
>>
>> /**
>> + * Gets an header or property of the correct type
>> + *
>> + * @param exchange the exchange
>> + * @param name the name of the header or the property
>> + * @param type the type
>> + * @return the header or property value
>> + * @throws TypeConversionException is thrown if error during type conversion
>> + * @throws NoSuchHeaderException is thrown if no headers exists
>> + */
>> + public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
>> + T answer = exchange.getIn().getHeader(name, type);
>> + if (answer == null) {
>> + answer = exchange.getProperty(name, type);
>> + }
>> + return answer;
>> + }
>> +
>> + /**
>> * Returns the mandatory inbound message body of the correct type or throws
>> * an exception if it is not present
>> *
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> new file mode 100644
>> index 0000000..4f8f845
>> --- /dev/null
>> +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> @@ -0,0 +1,43 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.util.function;
>> +
>> +import java.util.Objects;
>> +import java.util.concurrent.atomic.AtomicReference;
>> +import java.util.function.Supplier;
>> +
>> +public final class Suppliers {
>> + private Suppliers() {
>> + }
>> +
>> + public static <T> Supplier<T> memorize(Supplier<T> supplier) {
>> + final AtomicReference<T> valueHolder = new AtomicReference<>();
>> + return () -> {
>> + T supplied = valueHolder.get();
>> + if (supplied == null) {
>> + synchronized (valueHolder) {
>> + supplied = valueHolder.get();
>> + if (supplied == null) {
>> + supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
>> + valueHolder.lazySet(supplied);
>> + }
>> + }
>> + }
>> + return supplied;
>> + };
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> index 43bd8ff..ee14ca0 100644
>> --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
>> assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
>> assertEquals("123", exchange.getIn().getHeader("bar", String.class));
>> assertEquals(123, exchange.getIn().getHeader("bar", 234));
>> + assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
>> + assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
>>
>> assertEquals(123, exchange.getIn().getHeader("bar", 234));
>> assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
>> assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
>> + assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
>> + assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
>>
>> assertEquals(234, exchange.getIn().getHeader("cheese", 234));
>> assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
>> + assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
>> }
>>
>> public void testProperty() throws Exception {
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> new file mode 100644
>> index 0000000..4f68bc0
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> @@ -0,0 +1,58 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.util.ExchangeHelper;
>> +
>> +public class DynamicRouter4Test extends ContextTestSupport {
>> + public void testDynamicRouter() throws Exception {
>> + getMockEndpoint("mock:a").expectedMessageCount(1);
>> + getMockEndpoint("mock:b").expectedMessageCount(1);
>> + getMockEndpoint("mock:c").expectedMessageCount(1);
>> +
>> + template.sendBody("direct:start-1", "Hello World");
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + @Override
>> + protected RouteBuilder createRouteBuilder() throws Exception {
>> + return new RouteBuilder() {
>> + @Override
>> + public void configure() throws Exception {
>> + from("direct:start-1")
>> + .dynamicRouter()
>> + .exchange(DynamicRouter4Test::slip);
>> + }
>> + };
>> + }
>> +
>> + public static String slip(Exchange exchange) {
>> + String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
>> + if (previous == null) {
>> + return "mock:a,mock:b";
>> + } else if ("mock://b".equals(previous)) {
>> + return "mock:c";
>> + }
>> +
>> + // no more so return null
>> + return null;
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> new file mode 100644
>> index 0000000..9afd3f9
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> @@ -0,0 +1,53 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
>> +
>> +public class IdempotentConsumerDslTest extends ContextTestSupport {
>> +
>> + public void testDuplicateMessages() throws Exception {
>> + MockEndpoint mock = getMockEndpoint("mock:result");
>> + mock.expectedBodiesReceived("one", "two", "three");
>> +
>> + template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> + template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
>> + template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> + template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
>> + template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> + template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
>> +
>> + mock.assertIsSatisfied();
>> + }
>> +
>> + @Override
>> + protected RouteBuilder createRouteBuilder() throws Exception {
>> + return new RouteBuilder() {
>> + @Override
>> + public void configure() {
>> + from("direct:start")
>> + .idempotentConsumer()
>> + .message(m -> m.getHeader("messageId"))
>> + .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
>> + .to("mock:result");
>> + }
>> + };
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> index 2ef927a..54b5f6a 100644
>> --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
>>
>> public class LoopDoWhileTest extends ContextTestSupport {
>>
>> - public void testLoopDoWhile() throws Exception {
>> + public void testLoopDoWhileSimple() throws Exception {
>> getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>> getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>>
>> - template.sendBody("direct:start", "A");
>> + template.sendBody("direct:simple", "A");
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + public void testLoopDoWhileFunctional() throws Exception {
>> + getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>> + getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>> +
>> + template.sendBody("direct:functional", "A");
>>
>> assertMockEndpointsSatisfied();
>> }
>> @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
>> return new RouteBuilder() {
>> @Override
>> public void configure() throws Exception {
>> - from("direct:start")
>> + from("direct:simple")
>> .loopDoWhile(simple("${body.length} <= 5"))
>> .to("mock:loop")
>> .transform(body().append("A"))
>> .end()
>> .to("mock:result");
>> + from("direct:functional")
>> + .loopDoWhile()
>> + .body(String.class, b -> b.length() <= 5)
>> + .to("mock:loop")
>> + .transform()
>> + .body(String.class, b -> b += "A")
>> + .end()
>> + .to("mock:result");
>> }
>> };
>> }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> new file mode 100644
>> index 0000000..a800e36
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> @@ -0,0 +1,69 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class MulticastDslTest extends ContextTestSupport {
>> + public void testMulticastDsl() throws Exception {
>> + MockEndpoint mock = getMockEndpoint("mock:result");
>> + mock.expectedMessageCount(1);
>> + mock.expectedHeaderReceived("onPrepare", true);
>> + mock.expectedBodiesReceived(5);
>> +
>> + template.sendBody("direct:start", 1);
>> +
>> + mock.assertIsSatisfied();
>> + }
>> +
>> + @Override
>> + protected RouteBuilder createRouteBuilder() throws Exception {
>> + return new RouteBuilder() {
>> + @Override
>> + public void configure() throws Exception {
>> + from("direct:start")
>> + .multicast()
>> + .onPrepare()
>> + .message(m -> m.setHeader("onPrepare", true))
>> + .aggregationStrategy()
>> + .body(Integer.class, (o, n) -> o != null ? o + n : n)
>> + .to("direct:increase-by-1")
>> + .to("direct:increase-by-2")
>> + .end()
>> + .to("mock:result");
>> +
>> + from("direct:increase-by-1")
>> + .bean(new Increase(1));
>> + from("direct:increase-by-2")
>> + .bean(new Increase(2));
>> + }
>> + };
>> + }
>> +
>> + public static class Increase {
>> + private final int amount;
>> + public Increase(int amount) {
>> + this.amount = amount;
>> + }
>> +
>> + public int add(int num) {
>> + return num + amount;
>> + }
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> new file mode 100644
>> index 0000000..9d296b8
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> @@ -0,0 +1,49 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class RoutingSlipDslTest extends ContextTestSupport {
>> +
>> + public void testRoutingSlipDsl() throws Exception {
>> + MockEndpoint x = getMockEndpoint("mock:x");
>> + MockEndpoint y = getMockEndpoint("mock:y");
>> + MockEndpoint z = getMockEndpoint("mock:z");
>> +
>> + x.expectedBodiesReceived("foo", "bar");
>> + y.expectedBodiesReceived("foo", "bar");
>> + z.expectedBodiesReceived("foo", "bar");
>> +
>> + template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
>> + template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
>> +
>> + assertMockEndpointsSatisfied();
>> + }
>> +
>> + protected RouteBuilder createRouteBuilder() {
>> + return new RouteBuilder() {
>> + public void configure() {
>> + from("direct:a").routingSlip()
>> + .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
>> + .end();
>> + }
>> + };
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> new file mode 100644
>> index 0000000..a971332
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> @@ -0,0 +1,72 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import java.util.concurrent.ExecutorService;
>> +import java.util.concurrent.Executors;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class ThrottlerDslTest extends ContextTestSupport {
>> + private static final int INTERVAL = 500;
>> + protected int messageCount = 9;
>> +
>> + protected boolean canTest() {
>> + // skip test on windows as it does not run well there
>> + return !isPlatform("windows");
>> + }
>> +
>> + public void testDsl() throws Exception {
>> + if (!canTest()) {
>> + return;
>> + }
>> +
>> + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
>> + resultEndpoint.expectedMessageCount(messageCount);
>> +
>> + ExecutorService executor = Executors.newFixedThreadPool(messageCount);
>> +
>> + long start = System.currentTimeMillis();
>> + for (int i = 0; i < messageCount; i++) {
>> + executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
>> + }
>> +
>> + // let's wait for the exchanges to arrive
>> + resultEndpoint.assertIsSatisfied();
>> +
>> + // now assert that they have actually been throttled
>> + long minimumTime = (messageCount - 1) * INTERVAL;
>> + // add a little slack
>> + long delta = System.currentTimeMillis() - start + 200;
>> + assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
>> + executor.shutdownNow();
>> + }
>> +
>> + protected RouteBuilder createRouteBuilder() {
>> + return new RouteBuilder() {
>> + public void configure() {
>> + from("direct:start")
>> + .throttle()
>> + .message(m -> m.getHeader("ThrottleCount", Integer.class))
>> + .timePeriodMillis(INTERVAL)
>> + .to("log:result", "mock:result");
>> + }
>> + };
>> + }
>> +}
>> \ No newline at end of file
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> index 55fd14e..f8d1db4 100644
>> --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> @@ -21,20 +21,21 @@ import java.util.stream.Collectors;
>> import java.util.stream.Stream;
>>
>> import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>> import org.apache.camel.builder.RouteBuilder;
>> -import org.apache.camel.component.mock.MockEndpoint;
>>
>> public class AggregateDslTest extends ContextTestSupport {
>>
>> public void testAggregate() throws Exception {
>> - MockEndpoint mock = getMockEndpoint("mock:aggregated");
>> - mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>> + getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
>> + getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>>
>> for (int i = 0; i < 9; i++) {
>> template.sendBodyAndHeader("direct:start", i, "type", i % 3);
>> + template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
>> }
>>
>> - mock.assertIsSatisfied();
>> + assertMockEndpointsSatisfied();
>> }
>>
>> @Override
>> @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
>> .aggregate()
>> .message(m -> m.getHeader("type"))
>> .strategy()
>> - .body(String.class, (o, n) -> Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
>> + .body(String.class, AggregateDslTest::joinString)
>> .completion()
>> - .body(String.class, s -> s.length() == 5)
>> - .to("mock:aggregated");
>> + .body(String.class, s -> s.split(",").length == 2)
>> + .to("mock:aggregated");
>> +
>> + from("direct:start-supplier")
>> + .aggregate()
>> + .header("type")
>> + .strategy(AggregateDslTest::joinStringStrategy)
>> + .completion()
>> + .body(String.class, s -> s.split(",").length == 3)
>> + .to("mock:aggregated-supplier");
>> }
>> };
>> }
>> +
>> + // *************************************************************************
>> + // Strategies
>> + // *************************************************************************
>> +
>> + private static String joinString(String o, String n) {
>> + return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
>> + }
>> +
>> + private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
>> + newExchange.getIn().setBody(
>> + joinString(
>> + oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
>> + newExchange.getIn().getBody(String.class))
>> + );
>> +
>> + return newExchange;
>> + }
>> }
>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>