You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/02/16 17:10:48 UTC

camel git commit: CAMEL-10724: Improve Java DSL support for Java 8

Repository: camel
Updated Branches:
  refs/heads/master 8e0e3083e -> 8bc8484b1


CAMEL-10724: Improve Java DSL support for Java 8


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b

Branch: refs/heads/master
Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
Parents: 8e0e308
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Jan 18 18:09:08 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Thu Feb 16 18:10:22 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/Message.java | 16 ++++-
 .../org/apache/camel/impl/DefaultMessage.java   | 31 +++++++++
 .../apache/camel/model/AggregateDefinition.java | 20 ++++++
 .../model/IdempotentConsumerDefinition.java     |  9 +--
 .../apache/camel/model/MulticastDefinition.java | 26 +++++++
 .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
 .../org/apache/camel/util/ExchangeHelper.java   | 18 +++++
 .../apache/camel/util/function/Suppliers.java   | 43 ++++++++++++
 .../apache/camel/impl/DefaultExchangeTest.java  |  5 ++
 .../camel/processor/DynamicRouter4Test.java     | 58 ++++++++++++++++
 .../processor/IdempotentConsumerDslTest.java    | 53 ++++++++++++++
 .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
 .../camel/processor/MulticastDslTest.java       | 69 +++++++++++++++++++
 .../camel/processor/RoutingSlipDslTest.java     | 49 +++++++++++++
 .../camel/processor/ThrottlerDslTest.java       | 72 ++++++++++++++++++++
 .../processor/aggregator/AggregateDslTest.java  | 41 +++++++++--
 16 files changed, 582 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
index 8cfeae0..a0e9c4d 100644
--- a/camel-core/src/main/java/org/apache/camel/Message.java
+++ b/camel-core/src/main/java/org/apache/camel/Message.java
@@ -18,7 +18,7 @@ package org.apache.camel;
 
 import java.util.Map;
 import java.util.Set;
-
+import java.util.function.Supplier;
 import javax.activation.DataHandler;
 
 /**
@@ -88,6 +88,13 @@ public interface Message {
     Object getHeader(String name, Object defaultValue);
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     */
+    Object getHeader(String name, Supplier<Object> defaultValueSupplier);
+
+    /**
      * Returns a header associated with this message by name and specifying the
      * type required
      *
@@ -112,6 +119,13 @@ public interface Message {
     <T> T getHeader(String name, Object defaultValue, Class<T> type);
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     */
+    <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
+
+    /**
      * Sets a header on the message
      *
      * @param name of the header

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
index 848586a..1e49766 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 import javax.activation.DataHandler;
 
 import org.apache.camel.Attachment;
@@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
         return answer != null ? answer : defaultValue;
     }
 
+    public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
+        Object answer = getHeaders().get(name);
+        return answer != null ? answer : defaultValueSupplier.get();
+    }
+
     @SuppressWarnings("unchecked")
     public <T> T getHeader(String name, Class<T> type) {
         Object value = getHeader(name);
@@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
+        Object value = getHeader(name, defaultValueSupplier);
+        if (value == null) {
+            // lets avoid NullPointerException when converting to boolean for null values
+            if (boolean.class.isAssignableFrom(type)) {
+                return (T) Boolean.FALSE;
+            }
+            return null;
+        }
+
+        // eager same instance type test to avoid the overhead of invoking the type converter
+        // if already same type
+        if (type.isInstance(value)) {
+            return type.cast(value);
+        }
+
+        Exchange e = getExchange();
+        if (e != null) {
+            return e.getContext().getTypeConverter().convertTo(type, e, value);
+        } else {
+            return type.cast(value);
+        }
+    }
+
     public void setHeader(String name, Object value) {
         if (headers == null) {
             headers = createHeaders();

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index ec7d396..12f0a13 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
+        return aggregationStrategy(aggregationStrategy);
+    }
+
+    /**
      * Sets the aggregate strategy to use
      *
      * @param aggregationStrategy  the aggregate strategy to use
@@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
+        return completionPredicate(predicate);
+    }
+
+    /**
      * Indicates to complete all current aggregated exchanges when the context is stopped
      */
     public AggregateDefinition forceCompletionOnStop() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
index 256394d..9a58704 100644
--- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
@@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
-        IdempotentRepository<String> idempotentRepository =
-                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
+        IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
         ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
 
         Expression expression = getExpression().createExpression(routeContext);
@@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
         boolean eager = getEager() == null || getEager();
         boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
         boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
+
         // these boolean should be false by default
         boolean completionEager = getCompletionEager() != null && getCompletionEager();
 
@@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
      * @param routeContext route context
      * @return the repository
      */
-    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
+    @SuppressWarnings("unchecked")
+    protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
         if (messageIdRepositoryRef != null) {
             idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
         }
-        return idempotentRepository;
+        return (IdempotentRepository<T>)idempotentRepository;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 7bff217..37efcc6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategyClause;
+import org.apache.camel.builder.ProcessClause;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     // -------------------------------------------------------------------------
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
+        AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
+        setAggregationStrategy(clause);
+        return clause;
+    }
+
+    /**
      * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
      * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
      * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
@@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ProcessClause<MulticastDefinition> onPrepare() {
+        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
+        setOnPrepare(clause);
+        return clause;
+    }
+
+    /**
      * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
      * This can be used to deep-clone messages that should be send, or any custom logic needed before
      * the exchange is send.

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 21fbe2e..c40a0bd 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAnyAttribute;
@@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.support.ExpressionAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
+        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
+        addOutput(answer);
+
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
      * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
      * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
      * to avoid duplicate messages
@@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
         AggregateDefinition answer = new AggregateDefinition();
-        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
+        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
         answer.setExpression(clause);
         answer.setAggregationStrategy(aggregationStrategy);
         addOutput(answer);
@@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ExpressionClause<ThrottleDefinition> throttle() {
+        ThrottleDefinition answer = new ThrottleDefinition();
+        addOutput(answer);
+
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
      * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
      * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
      * or that we don't exceed an agreed SLA with some external service.
@@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ExpressionClause<LoopDefinition> loopDoWhile() {
+        LoopDefinition loop = new LoopDefinition();
+        loop.setDoWhile(true);
+
+        addOutput(loop);
+
+        return ExpressionClause.createAndSetExpression(loop);
+    }
+
+    /**
      * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
      * Creates a loop allowing to process the a message a number of times and possibly process them
      * in a different way.
@@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * Adds a processor which sets the header on the IN message
+     *
+     * @param name  the header name
+     * @param supplier the supplier used to set the header
+     * @return the builder
+     */
+    @SuppressWarnings("unchecked")
+    public Type setHeader(String name, final Supplier<Object> supplier) {
+        SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
+            @Override
+            public Object evaluate(Exchange exchange) {
+                return supplier.get();
+            }
+        });
+
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    /**
      * Adds a processor which sets the header on the OUT message
      *
      * @param name  the header name
@@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     public String getLabel() {
         return "";
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index ce3fdca..6e5967e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -130,6 +130,24 @@ public final class ExchangeHelper {
     }
 
     /**
+     * Gets an header or property of the correct type
+     *
+     * @param exchange      the exchange
+     * @param name          the name of the header or the property
+     * @param type          the type
+     * @return the header or property value
+     * @throws TypeConversionException is thrown if error during type conversion
+     * @throws NoSuchHeaderException is thrown if no headers exists
+     */
+    public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
+        T answer = exchange.getIn().getHeader(name, type);
+        if (answer == null) {
+            answer = exchange.getProperty(name, type);
+        }
+        return answer;
+    }
+
+    /**
      * Returns the mandatory inbound message body of the correct type or throws
      * an exception if it is not present
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
new file mode 100644
index 0000000..4f8f845
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.function;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public final class Suppliers {
+    private Suppliers() {
+    }
+
+    public static <T> Supplier<T> memorize(Supplier<T> supplier) {
+        final AtomicReference<T> valueHolder = new AtomicReference<>();
+        return () -> {
+            T supplied = valueHolder.get();
+            if (supplied == null) {
+                synchronized (valueHolder) {
+                    supplied = valueHolder.get();
+                    if (supplied == null) {
+                        supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
+                        valueHolder.lazySet(supplied);
+                    }
+                }
+            }
+            return supplied;
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
index 43bd8ff..ee14ca0 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
@@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
         assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
         assertEquals("123", exchange.getIn().getHeader("bar", String.class));
         assertEquals(123, exchange.getIn().getHeader("bar", 234));
+        assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
+        assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
 
         assertEquals(123, exchange.getIn().getHeader("bar", 234));
         assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
         assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
+        assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
+        assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
 
         assertEquals(234, exchange.getIn().getHeader("cheese", 234));
         assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
+        assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
     }
 
     public void testProperty() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
new file mode 100644
index 0000000..4f68bc0
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ExchangeHelper;
+
+public class DynamicRouter4Test extends ContextTestSupport {
+    public void testDynamicRouter() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(1);
+
+        template.sendBody("direct:start-1", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start-1")
+                    .dynamicRouter()
+                        .exchange(DynamicRouter4Test::slip);
+            }
+        };
+    }
+
+    public static String slip(Exchange exchange) {
+        String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
+        if (previous == null) {
+            return "mock:a,mock:b";
+        } else if ("mock://b".equals(previous)) {
+            return "mock:c";
+        }
+
+        // no more so return null
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
new file mode 100644
index 0000000..9afd3f9
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+public class IdempotentConsumerDslTest extends ContextTestSupport {
+
+    public void testDuplicateMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("one", "two", "three");
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                    .idempotentConsumer()
+                        .message(m -> m.getHeader("messageId"))
+                        .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
+                    .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
index 2ef927a..54b5f6a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
@@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
 
 public class LoopDoWhileTest extends ContextTestSupport {
 
-    public void testLoopDoWhile() throws Exception {
+    public void testLoopDoWhileSimple() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
         getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
 
-        template.sendBody("direct:start", "A");
+        template.sendBody("direct:simple", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testLoopDoWhileFunctional() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
+        getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
+
+        template.sendBody("direct:functional", "A");
 
         assertMockEndpointsSatisfied();
     }
@@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start")
+                from("direct:simple")
                     .loopDoWhile(simple("${body.length} <= 5"))
                         .to("mock:loop")
                         .transform(body().append("A"))
                     .end()
                     .to("mock:result");
+                from("direct:functional")
+                    .loopDoWhile()
+                        .body(String.class, b -> b.length() <= 5)
+                        .to("mock:loop")
+                        .transform()
+                            .body(String.class, b -> b += "A")
+                    .end()
+                    .to("mock:result");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
new file mode 100644
index 0000000..a800e36
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class MulticastDslTest extends ContextTestSupport {
+    public void testMulticastDsl() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("onPrepare", true);
+        mock.expectedBodiesReceived(5);
+
+        template.sendBody("direct:start", 1);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast()
+                        .onPrepare()
+                            .message(m -> m.setHeader("onPrepare", true))
+                        .aggregationStrategy()
+                            .body(Integer.class, (o, n) -> o != null ? o + n : n)
+                        .to("direct:increase-by-1")
+                        .to("direct:increase-by-2")
+                        .end()
+                    .to("mock:result");
+
+                from("direct:increase-by-1")
+                    .bean(new Increase(1));
+                from("direct:increase-by-2")
+                    .bean(new Increase(2));
+            }
+        };
+    }
+
+    public static class Increase {
+        private final int amount;
+        public Increase(int amount) {
+            this.amount = amount;
+        }
+
+        public int add(int num) {
+            return num + amount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
new file mode 100644
index 0000000..9d296b8
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class RoutingSlipDslTest extends ContextTestSupport {
+
+    public void testRoutingSlipDsl() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
+        template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").routingSlip()
+                    .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
+                    .end();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
new file mode 100644
index 0000000..a971332
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class ThrottlerDslTest extends ContextTestSupport {
+    private static final int INTERVAL = 500;
+    protected int messageCount = 9;
+
+    protected boolean canTest() {
+        // skip test on windows as it does not run well there
+        return !isPlatform("windows");
+    }
+
+    public void testDsl() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(messageCount);
+
+        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < messageCount; i++) {
+            executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
+        }
+
+        // let's wait for the exchanges to arrive
+        resultEndpoint.assertIsSatisfied();
+
+        // now assert that they have actually been throttled
+        long minimumTime = (messageCount - 1) * INTERVAL;
+        // add a little slack
+        long delta = System.currentTimeMillis() - start + 200;
+        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
+        executor.shutdownNow();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .throttle()
+                        .message(m -> m.getHeader("ThrottleCount", Integer.class))
+                        .timePeriodMillis(INTERVAL)
+                    .to("log:result", "mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
index 55fd14e..f8d1db4 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
@@ -21,20 +21,21 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 
 public class AggregateDslTest extends ContextTestSupport {
 
     public void testAggregate() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
+        getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
 
         for (int i = 0; i < 9; i++) {
             template.sendBodyAndHeader("direct:start", i, "type", i % 3);
+            template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
         }
 
-        mock.assertIsSatisfied();
+        assertMockEndpointsSatisfied();
     }
 
     @Override
@@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
                     .aggregate()
                         .message(m -> m.getHeader("type"))
                         .strategy()
-                            .body(String.class, (o, n) ->  Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
+                            .body(String.class, AggregateDslTest::joinString)
                         .completion()
-                            .body(String.class, s -> s.length() == 5)
-                                    .to("mock:aggregated");
+                            .body(String.class, s -> s.split(",").length == 2)
+                    .to("mock:aggregated");
+
+                from("direct:start-supplier")
+                    .aggregate()
+                        .header("type")
+                        .strategy(AggregateDslTest::joinStringStrategy)
+                        .completion()
+                            .body(String.class, s -> s.split(",").length == 3)
+                    .to("mock:aggregated-supplier");
             }
         };
     }
+
+    // *************************************************************************
+    // Strategies
+    // *************************************************************************
+
+    private static String joinString(String o, String n) {
+        return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
+    }
+
+    private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
+        newExchange.getIn().setBody(
+            joinString(
+                oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
+                newExchange.getIn().getBody(String.class))
+        );
+
+        return newExchange;
+    }
 }
 


Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8

Posted by Luca Burgazzoli <lb...@gmail.com>.
Should be fixed now

---
Luca Burgazzoli


On Thu, Feb 16, 2017 at 8:01 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi Luca
>
> This happens also for me. There is a little secret that if you change
> the Message API then the Scala DSL needs to be changed accordingly.
>
> The camel-scala can not be compiled now.
>
> On Thu, Feb 16, 2017 at 6:10 PM,  <lb...@apache.org> wrote:
>> Repository: camel
>> Updated Branches:
>>   refs/heads/master 8e0e3083e -> 8bc8484b1
>>
>>
>> CAMEL-10724: Improve Java DSL support for Java 8
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
>> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
>> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b
>>
>> Branch: refs/heads/master
>> Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
>> Parents: 8e0e308
>> Author: lburgazzoli <lb...@gmail.com>
>> Authored: Wed Jan 18 18:09:08 2017 +0100
>> Committer: lburgazzoli <lb...@gmail.com>
>> Committed: Thu Feb 16 18:10:22 2017 +0100
>>
>> ----------------------------------------------------------------------
>>  .../src/main/java/org/apache/camel/Message.java | 16 ++++-
>>  .../org/apache/camel/impl/DefaultMessage.java   | 31 +++++++++
>>  .../apache/camel/model/AggregateDefinition.java | 20 ++++++
>>  .../model/IdempotentConsumerDefinition.java     |  9 +--
>>  .../apache/camel/model/MulticastDefinition.java | 26 +++++++
>>  .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
>>  .../org/apache/camel/util/ExchangeHelper.java   | 18 +++++
>>  .../apache/camel/util/function/Suppliers.java   | 43 ++++++++++++
>>  .../apache/camel/impl/DefaultExchangeTest.java  |  5 ++
>>  .../camel/processor/DynamicRouter4Test.java     | 58 ++++++++++++++++
>>  .../processor/IdempotentConsumerDslTest.java    | 53 ++++++++++++++
>>  .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
>>  .../camel/processor/MulticastDslTest.java       | 69 +++++++++++++++++++
>>  .../camel/processor/RoutingSlipDslTest.java     | 49 +++++++++++++
>>  .../camel/processor/ThrottlerDslTest.java       | 72 ++++++++++++++++++++
>>  .../processor/aggregator/AggregateDslTest.java  | 41 +++++++++--
>>  16 files changed, 582 insertions(+), 17 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
>> index 8cfeae0..a0e9c4d 100644
>> --- a/camel-core/src/main/java/org/apache/camel/Message.java
>> +++ b/camel-core/src/main/java/org/apache/camel/Message.java
>> @@ -18,7 +18,7 @@ package org.apache.camel;
>>
>>  import java.util.Map;
>>  import java.util.Set;
>> -
>> +import java.util.function.Supplier;
>>  import javax.activation.DataHandler;
>>
>>  /**
>> @@ -88,6 +88,13 @@ public interface Message {
>>      Object getHeader(String name, Object defaultValue);
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     */
>> +    Object getHeader(String name, Supplier<Object> defaultValueSupplier);
>> +
>> +    /**
>>       * Returns a header associated with this message by name and specifying the
>>       * type required
>>       *
>> @@ -112,6 +119,13 @@ public interface Message {
>>      <T> T getHeader(String name, Object defaultValue, Class<T> type);
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     */
>> +    <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
>> +
>> +    /**
>>       * Sets a header on the message
>>       *
>>       * @param name of the header
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> index 848586a..1e49766 100644
>> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> @@ -20,6 +20,7 @@ import java.util.HashSet;
>>  import java.util.LinkedHashMap;
>>  import java.util.Map;
>>  import java.util.Set;
>> +import java.util.function.Supplier;
>>  import javax.activation.DataHandler;
>>
>>  import org.apache.camel.Attachment;
>> @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
>>          return answer != null ? answer : defaultValue;
>>      }
>>
>> +    public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
>> +        Object answer = getHeaders().get(name);
>> +        return answer != null ? answer : defaultValueSupplier.get();
>> +    }
>> +
>>      @SuppressWarnings("unchecked")
>>      public <T> T getHeader(String name, Class<T> type) {
>>          Object value = getHeader(name);
>> @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
>>          }
>>      }
>>
>> +    @SuppressWarnings("unchecked")
>> +    public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
>> +        Object value = getHeader(name, defaultValueSupplier);
>> +        if (value == null) {
>> +            // lets avoid NullPointerException when converting to boolean for null values
>> +            if (boolean.class.isAssignableFrom(type)) {
>> +                return (T) Boolean.FALSE;
>> +            }
>> +            return null;
>> +        }
>> +
>> +        // eager same instance type test to avoid the overhead of invoking the type converter
>> +        // if already same type
>> +        if (type.isInstance(value)) {
>> +            return type.cast(value);
>> +        }
>> +
>> +        Exchange e = getExchange();
>> +        if (e != null) {
>> +            return e.getContext().getTypeConverter().convertTo(type, e, value);
>> +        } else {
>> +            return type.cast(value);
>> +        }
>> +    }
>> +
>>      public void setHeader(String name, Object value) {
>>          if (headers == null) {
>>              headers = createHeaders();
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> index ec7d396..12f0a13 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
>> +        return aggregationStrategy(aggregationStrategy);
>> +    }
>> +
>> +    /**
>>       * Sets the aggregate strategy to use
>>       *
>>       * @param aggregationStrategy  the aggregate strategy to use
>> @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
>> +        return completionPredicate(predicate);
>> +    }
>> +
>> +    /**
>>       * Indicates to complete all current aggregated exchanges when the context is stopped
>>       */
>>      public AggregateDefinition forceCompletionOnStop() {
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> index 256394d..9a58704 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>>      public Processor createProcessor(RouteContext routeContext) throws Exception {
>>          Processor childProcessor = this.createChildProcessor(routeContext, true);
>>
>> -        IdempotentRepository<String> idempotentRepository =
>> -                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
>> +        IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
>>          ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
>>
>>          Expression expression = getExpression().createExpression(routeContext);
>> @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>>          boolean eager = getEager() == null || getEager();
>>          boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
>>          boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
>> +
>>          // these boolean should be false by default
>>          boolean completionEager = getCompletionEager() != null && getCompletionEager();
>>
>> @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>>       * @param routeContext route context
>>       * @return the repository
>>       */
>> -    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
>> +    @SuppressWarnings("unchecked")
>> +    protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
>>          if (messageIdRepositoryRef != null) {
>>              idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
>>          }
>> -        return idempotentRepository;
>> +        return (IdempotentRepository<T>)idempotentRepository;
>>      }
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> index 7bff217..37efcc6 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
>>
>>  import org.apache.camel.CamelContextAware;
>>  import org.apache.camel.Processor;
>> +import org.apache.camel.builder.AggregationStrategyClause;
>> +import org.apache.camel.builder.ProcessClause;
>>  import org.apache.camel.processor.MulticastProcessor;
>>  import org.apache.camel.processor.aggregate.AggregationStrategy;
>>  import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
>> @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>>      // -------------------------------------------------------------------------
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
>> +        AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
>> +        setAggregationStrategy(clause);
>> +        return clause;
>> +    }
>> +
>> +    /**
>>       * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
>>       * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
>>       * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
>> @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ProcessClause<MulticastDefinition> onPrepare() {
>> +        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
>> +        setOnPrepare(clause);
>> +        return clause;
>> +    }
>> +
>> +    /**
>>       * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
>>       * This can be used to deep-clone messages that should be send, or any custom logic needed before
>>       * the exchange is send.
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> index 21fbe2e..c40a0bd 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> @@ -28,6 +28,7 @@ import java.util.Map;
>>  import java.util.concurrent.ExecutorService;
>>  import java.util.concurrent.TimeUnit;
>>  import java.util.concurrent.atomic.AtomicInteger;
>> +import java.util.function.Supplier;
>>  import javax.xml.bind.annotation.XmlAccessType;
>>  import javax.xml.bind.annotation.XmlAccessorType;
>>  import javax.xml.bind.annotation.XmlAnyAttribute;
>> @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
>>  import org.apache.camel.spi.LifecycleStrategy;
>>  import org.apache.camel.spi.Policy;
>>  import org.apache.camel.spi.RouteContext;
>> +import org.apache.camel.support.ExpressionAdapter;
>>  import org.slf4j.Logger;
>>  import org.slf4j.LoggerFactory;
>>
>> @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
>> +        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
>> +        addOutput(answer);
>> +
>> +        return ExpressionClause.createAndSetExpression(answer);
>> +    }
>> +
>> +    /**
>>       * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
>>       * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
>>       * to avoid duplicate messages
>> @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>       */
>>      public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
>>          AggregateDefinition answer = new AggregateDefinition();
>> -        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
>> +        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
>>          answer.setExpression(clause);
>>          answer.setAggregationStrategy(aggregationStrategy);
>>          addOutput(answer);
>> @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ExpressionClause<ThrottleDefinition> throttle() {
>> +        ThrottleDefinition answer = new ThrottleDefinition();
>> +        addOutput(answer);
>> +
>> +        return ExpressionClause.createAndSetExpression(answer);
>> +    }
>> +
>> +    /**
>>       * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
>>       * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
>>       * or that we don't exceed an agreed SLA with some external service.
>> @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ExpressionClause<LoopDefinition> loopDoWhile() {
>> +        LoopDefinition loop = new LoopDefinition();
>> +        loop.setDoWhile(true);
>> +
>> +        addOutput(loop);
>> +
>> +        return ExpressionClause.createAndSetExpression(loop);
>> +    }
>> +
>> +    /**
>>       * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
>>       * Creates a loop allowing to process the a message a number of times and possibly process them
>>       * in a different way.
>> @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * Adds a processor which sets the header on the IN message
>> +     *
>> +     * @param name  the header name
>> +     * @param supplier the supplier used to set the header
>> +     * @return the builder
>> +     */
>> +    @SuppressWarnings("unchecked")
>> +    public Type setHeader(String name, final Supplier<Object> supplier) {
>> +        SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
>> +            @Override
>> +            public Object evaluate(Exchange exchange) {
>> +                return supplier.get();
>> +            }
>> +        });
>> +
>> +        addOutput(answer);
>> +        return (Type) this;
>> +    }
>> +
>> +    /**
>>       * Adds a processor which sets the header on the OUT message
>>       *
>>       * @param name  the header name
>> @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      public String getLabel() {
>>          return "";
>>      }
>> -
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> index ce3fdca..6e5967e 100644
>> --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> @@ -130,6 +130,24 @@ public final class ExchangeHelper {
>>      }
>>
>>      /**
>> +     * Gets an header or property of the correct type
>> +     *
>> +     * @param exchange      the exchange
>> +     * @param name          the name of the header or the property
>> +     * @param type          the type
>> +     * @return the header or property value
>> +     * @throws TypeConversionException is thrown if error during type conversion
>> +     * @throws NoSuchHeaderException is thrown if no headers exists
>> +     */
>> +    public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
>> +        T answer = exchange.getIn().getHeader(name, type);
>> +        if (answer == null) {
>> +            answer = exchange.getProperty(name, type);
>> +        }
>> +        return answer;
>> +    }
>> +
>> +    /**
>>       * Returns the mandatory inbound message body of the correct type or throws
>>       * an exception if it is not present
>>       *
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> new file mode 100644
>> index 0000000..4f8f845
>> --- /dev/null
>> +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> @@ -0,0 +1,43 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.util.function;
>> +
>> +import java.util.Objects;
>> +import java.util.concurrent.atomic.AtomicReference;
>> +import java.util.function.Supplier;
>> +
>> +public final class Suppliers {
>> +    private Suppliers() {
>> +    }
>> +
>> +    public static <T> Supplier<T> memorize(Supplier<T> supplier) {
>> +        final AtomicReference<T> valueHolder = new AtomicReference<>();
>> +        return () -> {
>> +            T supplied = valueHolder.get();
>> +            if (supplied == null) {
>> +                synchronized (valueHolder) {
>> +                    supplied = valueHolder.get();
>> +                    if (supplied == null) {
>> +                        supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
>> +                        valueHolder.lazySet(supplied);
>> +                    }
>> +                }
>> +            }
>> +            return supplied;
>> +        };
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> index 43bd8ff..ee14ca0 100644
>> --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
>>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
>>          assertEquals("123", exchange.getIn().getHeader("bar", String.class));
>>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
>> +        assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
>> +        assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
>>
>>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
>>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
>>          assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
>> +        assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
>> +        assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
>>
>>          assertEquals(234, exchange.getIn().getHeader("cheese", 234));
>>          assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
>> +        assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
>>      }
>>
>>      public void testProperty() throws Exception {
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> new file mode 100644
>> index 0000000..4f68bc0
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> @@ -0,0 +1,58 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.util.ExchangeHelper;
>> +
>> +public class DynamicRouter4Test extends ContextTestSupport {
>> +    public void testDynamicRouter() throws Exception {
>> +        getMockEndpoint("mock:a").expectedMessageCount(1);
>> +        getMockEndpoint("mock:b").expectedMessageCount(1);
>> +        getMockEndpoint("mock:c").expectedMessageCount(1);
>> +
>> +        template.sendBody("direct:start-1", "Hello World");
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() throws Exception {
>> +                from("direct:start-1")
>> +                    .dynamicRouter()
>> +                        .exchange(DynamicRouter4Test::slip);
>> +            }
>> +        };
>> +    }
>> +
>> +    public static String slip(Exchange exchange) {
>> +        String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
>> +        if (previous == null) {
>> +            return "mock:a,mock:b";
>> +        } else if ("mock://b".equals(previous)) {
>> +            return "mock:c";
>> +        }
>> +
>> +        // no more so return null
>> +        return null;
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> new file mode 100644
>> index 0000000..9afd3f9
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> @@ -0,0 +1,53 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
>> +
>> +public class IdempotentConsumerDslTest extends ContextTestSupport {
>> +
>> +    public void testDuplicateMessages() throws Exception {
>> +        MockEndpoint mock = getMockEndpoint("mock:result");
>> +        mock.expectedBodiesReceived("one", "two", "three");
>> +
>> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
>> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
>> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> +        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
>> +
>> +        mock.assertIsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() {
>> +                from("direct:start")
>> +                    .idempotentConsumer()
>> +                        .message(m -> m.getHeader("messageId"))
>> +                        .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
>> +                    .to("mock:result");
>> +            }
>> +        };
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> index 2ef927a..54b5f6a 100644
>> --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
>>
>>  public class LoopDoWhileTest extends ContextTestSupport {
>>
>> -    public void testLoopDoWhile() throws Exception {
>> +    public void testLoopDoWhileSimple() throws Exception {
>>          getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>>          getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>>
>> -        template.sendBody("direct:start", "A");
>> +        template.sendBody("direct:simple", "A");
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    public void testLoopDoWhileFunctional() throws Exception {
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>> +        getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>> +
>> +        template.sendBody("direct:functional", "A");
>>
>>          assertMockEndpointsSatisfied();
>>      }
>> @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
>>          return new RouteBuilder() {
>>              @Override
>>              public void configure() throws Exception {
>> -                from("direct:start")
>> +                from("direct:simple")
>>                      .loopDoWhile(simple("${body.length} <= 5"))
>>                          .to("mock:loop")
>>                          .transform(body().append("A"))
>>                      .end()
>>                      .to("mock:result");
>> +                from("direct:functional")
>> +                    .loopDoWhile()
>> +                        .body(String.class, b -> b.length() <= 5)
>> +                        .to("mock:loop")
>> +                        .transform()
>> +                            .body(String.class, b -> b += "A")
>> +                    .end()
>> +                    .to("mock:result");
>>              }
>>          };
>>      }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> new file mode 100644
>> index 0000000..a800e36
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> @@ -0,0 +1,69 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class MulticastDslTest extends ContextTestSupport {
>> +    public void testMulticastDsl() throws Exception {
>> +        MockEndpoint mock = getMockEndpoint("mock:result");
>> +        mock.expectedMessageCount(1);
>> +        mock.expectedHeaderReceived("onPrepare", true);
>> +        mock.expectedBodiesReceived(5);
>> +
>> +        template.sendBody("direct:start", 1);
>> +
>> +        mock.assertIsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() throws Exception {
>> +                from("direct:start")
>> +                    .multicast()
>> +                        .onPrepare()
>> +                            .message(m -> m.setHeader("onPrepare", true))
>> +                        .aggregationStrategy()
>> +                            .body(Integer.class, (o, n) -> o != null ? o + n : n)
>> +                        .to("direct:increase-by-1")
>> +                        .to("direct:increase-by-2")
>> +                        .end()
>> +                    .to("mock:result");
>> +
>> +                from("direct:increase-by-1")
>> +                    .bean(new Increase(1));
>> +                from("direct:increase-by-2")
>> +                    .bean(new Increase(2));
>> +            }
>> +        };
>> +    }
>> +
>> +    public static class Increase {
>> +        private final int amount;
>> +        public Increase(int amount) {
>> +            this.amount = amount;
>> +        }
>> +
>> +        public int add(int num) {
>> +            return num + amount;
>> +        }
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> new file mode 100644
>> index 0000000..9d296b8
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> @@ -0,0 +1,49 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class RoutingSlipDslTest extends ContextTestSupport {
>> +
>> +    public void testRoutingSlipDsl() throws Exception {
>> +        MockEndpoint x = getMockEndpoint("mock:x");
>> +        MockEndpoint y = getMockEndpoint("mock:y");
>> +        MockEndpoint z = getMockEndpoint("mock:z");
>> +
>> +        x.expectedBodiesReceived("foo", "bar");
>> +        y.expectedBodiesReceived("foo", "bar");
>> +        z.expectedBodiesReceived("foo", "bar");
>> +
>> +        template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
>> +        template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    protected RouteBuilder createRouteBuilder() {
>> +        return new RouteBuilder() {
>> +            public void configure() {
>> +                from("direct:a").routingSlip()
>> +                    .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
>> +                    .end();
>> +            }
>> +        };
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> new file mode 100644
>> index 0000000..a971332
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> @@ -0,0 +1,72 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import java.util.concurrent.ExecutorService;
>> +import java.util.concurrent.Executors;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class ThrottlerDslTest extends ContextTestSupport {
>> +    private static final int INTERVAL = 500;
>> +    protected int messageCount = 9;
>> +
>> +    protected boolean canTest() {
>> +        // skip test on windows as it does not run well there
>> +        return !isPlatform("windows");
>> +    }
>> +
>> +    public void testDsl() throws Exception {
>> +        if (!canTest()) {
>> +            return;
>> +        }
>> +
>> +        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
>> +        resultEndpoint.expectedMessageCount(messageCount);
>> +
>> +        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
>> +
>> +        long start = System.currentTimeMillis();
>> +        for (int i = 0; i < messageCount; i++) {
>> +            executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
>> +        }
>> +
>> +        // let's wait for the exchanges to arrive
>> +        resultEndpoint.assertIsSatisfied();
>> +
>> +        // now assert that they have actually been throttled
>> +        long minimumTime = (messageCount - 1) * INTERVAL;
>> +        // add a little slack
>> +        long delta = System.currentTimeMillis() - start + 200;
>> +        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
>> +        executor.shutdownNow();
>> +    }
>> +
>> +    protected RouteBuilder createRouteBuilder() {
>> +        return new RouteBuilder() {
>> +            public void configure() {
>> +                from("direct:start")
>> +                    .throttle()
>> +                        .message(m -> m.getHeader("ThrottleCount", Integer.class))
>> +                        .timePeriodMillis(INTERVAL)
>> +                    .to("log:result", "mock:result");
>> +            }
>> +        };
>> +    }
>> +}
>> \ No newline at end of file
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> index 55fd14e..f8d1db4 100644
>> --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> @@ -21,20 +21,21 @@ import java.util.stream.Collectors;
>>  import java.util.stream.Stream;
>>
>>  import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>>  import org.apache.camel.builder.RouteBuilder;
>> -import org.apache.camel.component.mock.MockEndpoint;
>>
>>  public class AggregateDslTest extends ContextTestSupport {
>>
>>      public void testAggregate() throws Exception {
>> -        MockEndpoint mock = getMockEndpoint("mock:aggregated");
>> -        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>> +        getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
>> +        getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>>
>>          for (int i = 0; i < 9; i++) {
>>              template.sendBodyAndHeader("direct:start", i, "type", i % 3);
>> +            template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
>>          }
>>
>> -        mock.assertIsSatisfied();
>> +        assertMockEndpointsSatisfied();
>>      }
>>
>>      @Override
>> @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
>>                      .aggregate()
>>                          .message(m -> m.getHeader("type"))
>>                          .strategy()
>> -                            .body(String.class, (o, n) ->  Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
>> +                            .body(String.class, AggregateDslTest::joinString)
>>                          .completion()
>> -                            .body(String.class, s -> s.length() == 5)
>> -                                    .to("mock:aggregated");
>> +                            .body(String.class, s -> s.split(",").length == 2)
>> +                    .to("mock:aggregated");
>> +
>> +                from("direct:start-supplier")
>> +                    .aggregate()
>> +                        .header("type")
>> +                        .strategy(AggregateDslTest::joinStringStrategy)
>> +                        .completion()
>> +                            .body(String.class, s -> s.split(",").length == 3)
>> +                    .to("mock:aggregated-supplier");
>>              }
>>          };
>>      }
>> +
>> +    // *************************************************************************
>> +    // Strategies
>> +    // *************************************************************************
>> +
>> +    private static String joinString(String o, String n) {
>> +        return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
>> +    }
>> +
>> +    private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
>> +        newExchange.getIn().setBody(
>> +            joinString(
>> +                oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
>> +                newExchange.getIn().getBody(String.class))
>> +        );
>> +
>> +        return newExchange;
>> +    }
>>  }
>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>


Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8

Posted by Claus Ibsen <cl...@gmail.com>.
Hi Luca

This happens also for me. There is a little secret that if you change
the Message API then the Scala DSL needs to be changed accordingly.

The camel-scala can not be compiled now.

On Thu, Feb 16, 2017 at 6:10 PM,  <lb...@apache.org> wrote:
> Repository: camel
> Updated Branches:
>   refs/heads/master 8e0e3083e -> 8bc8484b1
>
>
> CAMEL-10724: Improve Java DSL support for Java 8
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b
>
> Branch: refs/heads/master
> Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
> Parents: 8e0e308
> Author: lburgazzoli <lb...@gmail.com>
> Authored: Wed Jan 18 18:09:08 2017 +0100
> Committer: lburgazzoli <lb...@gmail.com>
> Committed: Thu Feb 16 18:10:22 2017 +0100
>
> ----------------------------------------------------------------------
>  .../src/main/java/org/apache/camel/Message.java | 16 ++++-
>  .../org/apache/camel/impl/DefaultMessage.java   | 31 +++++++++
>  .../apache/camel/model/AggregateDefinition.java | 20 ++++++
>  .../model/IdempotentConsumerDefinition.java     |  9 +--
>  .../apache/camel/model/MulticastDefinition.java | 26 +++++++
>  .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
>  .../org/apache/camel/util/ExchangeHelper.java   | 18 +++++
>  .../apache/camel/util/function/Suppliers.java   | 43 ++++++++++++
>  .../apache/camel/impl/DefaultExchangeTest.java  |  5 ++
>  .../camel/processor/DynamicRouter4Test.java     | 58 ++++++++++++++++
>  .../processor/IdempotentConsumerDslTest.java    | 53 ++++++++++++++
>  .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
>  .../camel/processor/MulticastDslTest.java       | 69 +++++++++++++++++++
>  .../camel/processor/RoutingSlipDslTest.java     | 49 +++++++++++++
>  .../camel/processor/ThrottlerDslTest.java       | 72 ++++++++++++++++++++
>  .../processor/aggregator/AggregateDslTest.java  | 41 +++++++++--
>  16 files changed, 582 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
> index 8cfeae0..a0e9c4d 100644
> --- a/camel-core/src/main/java/org/apache/camel/Message.java
> +++ b/camel-core/src/main/java/org/apache/camel/Message.java
> @@ -18,7 +18,7 @@ package org.apache.camel;
>
>  import java.util.Map;
>  import java.util.Set;
> -
> +import java.util.function.Supplier;
>  import javax.activation.DataHandler;
>
>  /**
> @@ -88,6 +88,13 @@ public interface Message {
>      Object getHeader(String name, Object defaultValue);
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     */
> +    Object getHeader(String name, Supplier<Object> defaultValueSupplier);
> +
> +    /**
>       * Returns a header associated with this message by name and specifying the
>       * type required
>       *
> @@ -112,6 +119,13 @@ public interface Message {
>      <T> T getHeader(String name, Object defaultValue, Class<T> type);
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     */
> +    <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
> +
> +    /**
>       * Sets a header on the message
>       *
>       * @param name of the header
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> index 848586a..1e49766 100644
> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> @@ -20,6 +20,7 @@ import java.util.HashSet;
>  import java.util.LinkedHashMap;
>  import java.util.Map;
>  import java.util.Set;
> +import java.util.function.Supplier;
>  import javax.activation.DataHandler;
>
>  import org.apache.camel.Attachment;
> @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
>          return answer != null ? answer : defaultValue;
>      }
>
> +    public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
> +        Object answer = getHeaders().get(name);
> +        return answer != null ? answer : defaultValueSupplier.get();
> +    }
> +
>      @SuppressWarnings("unchecked")
>      public <T> T getHeader(String name, Class<T> type) {
>          Object value = getHeader(name);
> @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
>          }
>      }
>
> +    @SuppressWarnings("unchecked")
> +    public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
> +        Object value = getHeader(name, defaultValueSupplier);
> +        if (value == null) {
> +            // lets avoid NullPointerException when converting to boolean for null values
> +            if (boolean.class.isAssignableFrom(type)) {
> +                return (T) Boolean.FALSE;
> +            }
> +            return null;
> +        }
> +
> +        // eager same instance type test to avoid the overhead of invoking the type converter
> +        // if already same type
> +        if (type.isInstance(value)) {
> +            return type.cast(value);
> +        }
> +
> +        Exchange e = getExchange();
> +        if (e != null) {
> +            return e.getContext().getTypeConverter().convertTo(type, e, value);
> +        } else {
> +            return type.cast(value);
> +        }
> +    }
> +
>      public void setHeader(String name, Object value) {
>          if (headers == null) {
>              headers = createHeaders();
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> index ec7d396..12f0a13 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
> +        return aggregationStrategy(aggregationStrategy);
> +    }
> +
> +    /**
>       * Sets the aggregate strategy to use
>       *
>       * @param aggregationStrategy  the aggregate strategy to use
> @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
> +        return completionPredicate(predicate);
> +    }
> +
> +    /**
>       * Indicates to complete all current aggregated exchanges when the context is stopped
>       */
>      public AggregateDefinition forceCompletionOnStop() {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> index 256394d..9a58704 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>      public Processor createProcessor(RouteContext routeContext) throws Exception {
>          Processor childProcessor = this.createChildProcessor(routeContext, true);
>
> -        IdempotentRepository<String> idempotentRepository =
> -                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
> +        IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
>          ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
>
>          Expression expression = getExpression().createExpression(routeContext);
> @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>          boolean eager = getEager() == null || getEager();
>          boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
>          boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
> +
>          // these boolean should be false by default
>          boolean completionEager = getCompletionEager() != null && getCompletionEager();
>
> @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>       * @param routeContext route context
>       * @return the repository
>       */
> -    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
> +    @SuppressWarnings("unchecked")
> +    protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
>          if (messageIdRepositoryRef != null) {
>              idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
>          }
> -        return idempotentRepository;
> +        return (IdempotentRepository<T>)idempotentRepository;
>      }
>  }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> index 7bff217..37efcc6 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
>
>  import org.apache.camel.CamelContextAware;
>  import org.apache.camel.Processor;
> +import org.apache.camel.builder.AggregationStrategyClause;
> +import org.apache.camel.builder.ProcessClause;
>  import org.apache.camel.processor.MulticastProcessor;
>  import org.apache.camel.processor.aggregate.AggregationStrategy;
>  import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
> @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>      // -------------------------------------------------------------------------
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
> +        AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
> +        setAggregationStrategy(clause);
> +        return clause;
> +    }
> +
> +    /**
>       * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
>       * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
>       * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
> @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ProcessClause<MulticastDefinition> onPrepare() {
> +        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
> +        setOnPrepare(clause);
> +        return clause;
> +    }
> +
> +    /**
>       * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
>       * This can be used to deep-clone messages that should be send, or any custom logic needed before
>       * the exchange is send.
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> index 21fbe2e..c40a0bd 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> @@ -28,6 +28,7 @@ import java.util.Map;
>  import java.util.concurrent.ExecutorService;
>  import java.util.concurrent.TimeUnit;
>  import java.util.concurrent.atomic.AtomicInteger;
> +import java.util.function.Supplier;
>  import javax.xml.bind.annotation.XmlAccessType;
>  import javax.xml.bind.annotation.XmlAccessorType;
>  import javax.xml.bind.annotation.XmlAnyAttribute;
> @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
>  import org.apache.camel.spi.LifecycleStrategy;
>  import org.apache.camel.spi.Policy;
>  import org.apache.camel.spi.RouteContext;
> +import org.apache.camel.support.ExpressionAdapter;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>
> @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
> +        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
> +        addOutput(answer);
> +
> +        return ExpressionClause.createAndSetExpression(answer);
> +    }
> +
> +    /**
>       * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
>       * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
>       * to avoid duplicate messages
> @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>       */
>      public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
>          AggregateDefinition answer = new AggregateDefinition();
> -        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
> +        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
>          answer.setExpression(clause);
>          answer.setAggregationStrategy(aggregationStrategy);
>          addOutput(answer);
> @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ExpressionClause<ThrottleDefinition> throttle() {
> +        ThrottleDefinition answer = new ThrottleDefinition();
> +        addOutput(answer);
> +
> +        return ExpressionClause.createAndSetExpression(answer);
> +    }
> +
> +    /**
>       * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
>       * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
>       * or that we don't exceed an agreed SLA with some external service.
> @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ExpressionClause<LoopDefinition> loopDoWhile() {
> +        LoopDefinition loop = new LoopDefinition();
> +        loop.setDoWhile(true);
> +
> +        addOutput(loop);
> +
> +        return ExpressionClause.createAndSetExpression(loop);
> +    }
> +
> +    /**
>       * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
>       * Creates a loop allowing to process the a message a number of times and possibly process them
>       * in a different way.
> @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * Adds a processor which sets the header on the IN message
> +     *
> +     * @param name  the header name
> +     * @param supplier the supplier used to set the header
> +     * @return the builder
> +     */
> +    @SuppressWarnings("unchecked")
> +    public Type setHeader(String name, final Supplier<Object> supplier) {
> +        SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
> +            @Override
> +            public Object evaluate(Exchange exchange) {
> +                return supplier.get();
> +            }
> +        });
> +
> +        addOutput(answer);
> +        return (Type) this;
> +    }
> +
> +    /**
>       * Adds a processor which sets the header on the OUT message
>       *
>       * @param name  the header name
> @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      public String getLabel() {
>          return "";
>      }
> -
>  }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> index ce3fdca..6e5967e 100644
> --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> @@ -130,6 +130,24 @@ public final class ExchangeHelper {
>      }
>
>      /**
> +     * Gets an header or property of the correct type
> +     *
> +     * @param exchange      the exchange
> +     * @param name          the name of the header or the property
> +     * @param type          the type
> +     * @return the header or property value
> +     * @throws TypeConversionException is thrown if error during type conversion
> +     * @throws NoSuchHeaderException is thrown if no headers exists
> +     */
> +    public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
> +        T answer = exchange.getIn().getHeader(name, type);
> +        if (answer == null) {
> +            answer = exchange.getProperty(name, type);
> +        }
> +        return answer;
> +    }
> +
> +    /**
>       * Returns the mandatory inbound message body of the correct type or throws
>       * an exception if it is not present
>       *
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> new file mode 100644
> index 0000000..4f8f845
> --- /dev/null
> +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> @@ -0,0 +1,43 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.util.function;
> +
> +import java.util.Objects;
> +import java.util.concurrent.atomic.AtomicReference;
> +import java.util.function.Supplier;
> +
> +public final class Suppliers {
> +    private Suppliers() {
> +    }
> +
> +    public static <T> Supplier<T> memorize(Supplier<T> supplier) {
> +        final AtomicReference<T> valueHolder = new AtomicReference<>();
> +        return () -> {
> +            T supplied = valueHolder.get();
> +            if (supplied == null) {
> +                synchronized (valueHolder) {
> +                    supplied = valueHolder.get();
> +                    if (supplied == null) {
> +                        supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
> +                        valueHolder.lazySet(supplied);
> +                    }
> +                }
> +            }
> +            return supplied;
> +        };
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> index 43bd8ff..ee14ca0 100644
> --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
>          assertEquals("123", exchange.getIn().getHeader("bar", String.class));
>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
> +        assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
> +        assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
>
>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
>          assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
> +        assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
> +        assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
>
>          assertEquals(234, exchange.getIn().getHeader("cheese", 234));
>          assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
> +        assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
>      }
>
>      public void testProperty() throws Exception {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> new file mode 100644
> index 0000000..4f68bc0
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> @@ -0,0 +1,58 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.Exchange;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.util.ExchangeHelper;
> +
> +public class DynamicRouter4Test extends ContextTestSupport {
> +    public void testDynamicRouter() throws Exception {
> +        getMockEndpoint("mock:a").expectedMessageCount(1);
> +        getMockEndpoint("mock:b").expectedMessageCount(1);
> +        getMockEndpoint("mock:c").expectedMessageCount(1);
> +
> +        template.sendBody("direct:start-1", "Hello World");
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("direct:start-1")
> +                    .dynamicRouter()
> +                        .exchange(DynamicRouter4Test::slip);
> +            }
> +        };
> +    }
> +
> +    public static String slip(Exchange exchange) {
> +        String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
> +        if (previous == null) {
> +            return "mock:a,mock:b";
> +        } else if ("mock://b".equals(previous)) {
> +            return "mock:c";
> +        }
> +
> +        // no more so return null
> +        return null;
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> new file mode 100644
> index 0000000..9afd3f9
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> @@ -0,0 +1,53 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
> +
> +public class IdempotentConsumerDslTest extends ContextTestSupport {
> +
> +    public void testDuplicateMessages() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:result");
> +        mock.expectedBodiesReceived("one", "two", "three");
> +
> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> +        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
> +
> +        mock.assertIsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() {
> +                from("direct:start")
> +                    .idempotentConsumer()
> +                        .message(m -> m.getHeader("messageId"))
> +                        .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
> +                    .to("mock:result");
> +            }
> +        };
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> index 2ef927a..54b5f6a 100644
> --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
>
>  public class LoopDoWhileTest extends ContextTestSupport {
>
> -    public void testLoopDoWhile() throws Exception {
> +    public void testLoopDoWhileSimple() throws Exception {
>          getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>          getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>
> -        template.sendBody("direct:start", "A");
> +        template.sendBody("direct:simple", "A");
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testLoopDoWhileFunctional() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
> +        getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
> +
> +        template.sendBody("direct:functional", "A");
>
>          assertMockEndpointsSatisfied();
>      }
> @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
>          return new RouteBuilder() {
>              @Override
>              public void configure() throws Exception {
> -                from("direct:start")
> +                from("direct:simple")
>                      .loopDoWhile(simple("${body.length} <= 5"))
>                          .to("mock:loop")
>                          .transform(body().append("A"))
>                      .end()
>                      .to("mock:result");
> +                from("direct:functional")
> +                    .loopDoWhile()
> +                        .body(String.class, b -> b.length() <= 5)
> +                        .to("mock:loop")
> +                        .transform()
> +                            .body(String.class, b -> b += "A")
> +                    .end()
> +                    .to("mock:result");
>              }
>          };
>      }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> new file mode 100644
> index 0000000..a800e36
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> @@ -0,0 +1,69 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class MulticastDslTest extends ContextTestSupport {
> +    public void testMulticastDsl() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:result");
> +        mock.expectedMessageCount(1);
> +        mock.expectedHeaderReceived("onPrepare", true);
> +        mock.expectedBodiesReceived(5);
> +
> +        template.sendBody("direct:start", 1);
> +
> +        mock.assertIsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("direct:start")
> +                    .multicast()
> +                        .onPrepare()
> +                            .message(m -> m.setHeader("onPrepare", true))
> +                        .aggregationStrategy()
> +                            .body(Integer.class, (o, n) -> o != null ? o + n : n)
> +                        .to("direct:increase-by-1")
> +                        .to("direct:increase-by-2")
> +                        .end()
> +                    .to("mock:result");
> +
> +                from("direct:increase-by-1")
> +                    .bean(new Increase(1));
> +                from("direct:increase-by-2")
> +                    .bean(new Increase(2));
> +            }
> +        };
> +    }
> +
> +    public static class Increase {
> +        private final int amount;
> +        public Increase(int amount) {
> +            this.amount = amount;
> +        }
> +
> +        public int add(int num) {
> +            return num + amount;
> +        }
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> new file mode 100644
> index 0000000..9d296b8
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> @@ -0,0 +1,49 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class RoutingSlipDslTest extends ContextTestSupport {
> +
> +    public void testRoutingSlipDsl() throws Exception {
> +        MockEndpoint x = getMockEndpoint("mock:x");
> +        MockEndpoint y = getMockEndpoint("mock:y");
> +        MockEndpoint z = getMockEndpoint("mock:z");
> +
> +        x.expectedBodiesReceived("foo", "bar");
> +        y.expectedBodiesReceived("foo", "bar");
> +        z.expectedBodiesReceived("foo", "bar");
> +
> +        template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
> +        template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    protected RouteBuilder createRouteBuilder() {
> +        return new RouteBuilder() {
> +            public void configure() {
> +                from("direct:a").routingSlip()
> +                    .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
> +                    .end();
> +            }
> +        };
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> new file mode 100644
> index 0000000..a971332
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> @@ -0,0 +1,72 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class ThrottlerDslTest extends ContextTestSupport {
> +    private static final int INTERVAL = 500;
> +    protected int messageCount = 9;
> +
> +    protected boolean canTest() {
> +        // skip test on windows as it does not run well there
> +        return !isPlatform("windows");
> +    }
> +
> +    public void testDsl() throws Exception {
> +        if (!canTest()) {
> +            return;
> +        }
> +
> +        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
> +        resultEndpoint.expectedMessageCount(messageCount);
> +
> +        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
> +
> +        long start = System.currentTimeMillis();
> +        for (int i = 0; i < messageCount; i++) {
> +            executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
> +        }
> +
> +        // let's wait for the exchanges to arrive
> +        resultEndpoint.assertIsSatisfied();
> +
> +        // now assert that they have actually been throttled
> +        long minimumTime = (messageCount - 1) * INTERVAL;
> +        // add a little slack
> +        long delta = System.currentTimeMillis() - start + 200;
> +        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
> +        executor.shutdownNow();
> +    }
> +
> +    protected RouteBuilder createRouteBuilder() {
> +        return new RouteBuilder() {
> +            public void configure() {
> +                from("direct:start")
> +                    .throttle()
> +                        .message(m -> m.getHeader("ThrottleCount", Integer.class))
> +                        .timePeriodMillis(INTERVAL)
> +                    .to("log:result", "mock:result");
> +            }
> +        };
> +    }
> +}
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> index 55fd14e..f8d1db4 100644
> --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> @@ -21,20 +21,21 @@ import java.util.stream.Collectors;
>  import java.util.stream.Stream;
>
>  import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.Exchange;
>  import org.apache.camel.builder.RouteBuilder;
> -import org.apache.camel.component.mock.MockEndpoint;
>
>  public class AggregateDslTest extends ContextTestSupport {
>
>      public void testAggregate() throws Exception {
> -        MockEndpoint mock = getMockEndpoint("mock:aggregated");
> -        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
> +        getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
> +        getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>
>          for (int i = 0; i < 9; i++) {
>              template.sendBodyAndHeader("direct:start", i, "type", i % 3);
> +            template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
>          }
>
> -        mock.assertIsSatisfied();
> +        assertMockEndpointsSatisfied();
>      }
>
>      @Override
> @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
>                      .aggregate()
>                          .message(m -> m.getHeader("type"))
>                          .strategy()
> -                            .body(String.class, (o, n) ->  Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
> +                            .body(String.class, AggregateDslTest::joinString)
>                          .completion()
> -                            .body(String.class, s -> s.length() == 5)
> -                                    .to("mock:aggregated");
> +                            .body(String.class, s -> s.split(",").length == 2)
> +                    .to("mock:aggregated");
> +
> +                from("direct:start-supplier")
> +                    .aggregate()
> +                        .header("type")
> +                        .strategy(AggregateDslTest::joinStringStrategy)
> +                        .completion()
> +                            .body(String.class, s -> s.split(",").length == 3)
> +                    .to("mock:aggregated-supplier");
>              }
>          };
>      }
> +
> +    // *************************************************************************
> +    // Strategies
> +    // *************************************************************************
> +
> +    private static String joinString(String o, String n) {
> +        return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
> +    }
> +
> +    private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
> +        newExchange.getIn().setBody(
> +            joinString(
> +                oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
> +                newExchange.getIn().getBody(String.class))
> +        );
> +
> +        return newExchange;
> +    }
>  }
>
>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2