You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2016/11/08 11:15:56 UTC

Re: camel git commit: CAMEL-10438: Java8 DSL for Content Enricher and Aggregator

Hi

I think you merged this too quick.

We have to be careful with making Java DSL to far away from XML DSL.
The power of Camel is that those DSLs are intuitive and that they are
interchangeable and people from one world can easily transform to the
other.

So if we get to many *new* java 8'ish methods that are not in XML then
we confuse users and also ourselves.

I would like to ponder more on this before merging to master branch.


On Tue, Nov 8, 2016 at 11:39 AM,  <lb...@apache.org> wrote:
> Repository: camel
> Updated Branches:
>   refs/heads/master e4b1a5213 -> 40fa7f865
>
>
> CAMEL-10438: Java8 DSL for Content Enricher and Aggregator
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40fa7f86
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40fa7f86
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40fa7f86
>
> Branch: refs/heads/master
> Commit: 40fa7f86535261a2e685b69ca14c5920ff4e32df
> Parents: e4b1a52
> Author: lburgazzoli <lb...@gmail.com>
> Authored: Fri Nov 4 13:33:06 2016 +0100
> Committer: lburgazzoli <lb...@gmail.com>
> Committed: Tue Nov 8 11:02:57 2016 +0100
>
> ----------------------------------------------------------------------
>  .../builder/AggregationStrategyClause.java      | 116 +++++++++++++++++++
>  .../org/apache/camel/builder/EnrichClause.java  |  25 ++++
>  .../apache/camel/builder/PredicateClause.java   | 114 ++++++++++++++++++
>  .../apache/camel/model/AggregateDefinition.java |  46 ++++++++
>  .../apache/camel/model/ProcessorDefinition.java |  74 ++++++++++++
>  .../processor/aggregator/AggregateDslTest.java  |  57 +++++++++
>  .../processor/enricher/EnricherDslTest.java     |  53 +++++++++
>  7 files changed, 485 insertions(+)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java b/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
> new file mode 100644
> index 0000000..c72f904
> --- /dev/null
> +++ b/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
> @@ -0,0 +1,116 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.camel.builder;
> +
> +import java.util.function.BiFunction;
> +
> +import org.apache.camel.Exchange;
> +import org.apache.camel.Message;
> +import org.apache.camel.processor.aggregate.AggregationStrategy;
> +import org.apache.camel.util.ObjectHelper;
> +
> +public class AggregationStrategyClause<T> implements AggregationStrategy {
> +    private final T parent;
> +    private AggregationStrategy strategy;
> +
> +    public AggregationStrategyClause(T parent) {
> +        this.parent = parent;
> +        this.strategy = null;
> +    }
> +
> +    @Override
> +    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> +        return ObjectHelper.notNull(strategy, "AggregationStrategy").aggregate(oldExchange, newExchange);
> +    }
> +
> +    // *******************************
> +    // Exchange
> +    // *******************************
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T exchange(final BiFunction<Exchange, Exchange, Exchange> function) {
> +        strategy = function::apply;
> +        return parent;
> +    }
> +
> +    // *******************************
> +    // Message
> +    // *******************************
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T message(final BiFunction<Message, Message, Message> function) {
> +        return exchange((Exchange oldExchange, Exchange newExchange) -> {
> +            Message oldMessage = oldExchange != null ? oldExchange.getIn() : null;
> +            Message newMessage = ObjectHelper.notNull(newExchange, "NewExchange").getIn();
> +            Message result = function.apply(oldMessage, newMessage);
> +
> +            if (oldExchange != null) {
> +                oldExchange.setIn(result);
> +                return oldExchange;
> +            } else {
> +                newExchange.setIn(result);
> +                return newExchange;
> +            }
> +        });
> +    }
> +
> +    // *******************************
> +    // Body
> +    // *******************************
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T body(final BiFunction<Object, Object, Object> function) {
> +        return body(Object.class, function);
> +    }
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public <B> T body(final Class<B> type, final BiFunction<B, B, B> function) {
> +        return exchange((Exchange oldExchange, Exchange newExchange) -> {
> +            Message oldMessage = oldExchange != null ? oldExchange.getIn() : null;
> +            Message newMessage = ObjectHelper.notNull(newExchange, "NewExchange").getIn();
> +
> +            B result = function.apply(
> +                oldMessage != null ? oldMessage.getBody(type) : null,
> +                newMessage != null ? newMessage.getBody(type) : null);
> +
> +            if (oldExchange != null) {
> +                oldExchange.getIn().setBody(result);
> +                return oldExchange;
> +            } else {
> +                newExchange.getIn().setBody(result);
> +                return newExchange;
> +            }
> +        });
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java b/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java
> new file mode 100644
> index 0000000..bc1f78b
> --- /dev/null
> +++ b/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java
> @@ -0,0 +1,25 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.builder;
> +
> +import org.apache.camel.model.ProcessorDefinition;
> +
> +public class EnrichClause<T extends ProcessorDefinition<?>> extends AggregationStrategyClause<T> {
> +    public EnrichClause(T parent) {
> +        super(parent);
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java b/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java
> new file mode 100644
> index 0000000..d44d2b3
> --- /dev/null
> +++ b/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java
> @@ -0,0 +1,114 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.camel.builder;
> +
> +import java.util.Map;
> +import java.util.function.BiPredicate;
> +import java.util.function.Predicate;
> +
> +import org.apache.camel.Exchange;
> +import org.apache.camel.Message;
> +
> +public class PredicateClause<T> implements org.apache.camel.Predicate {
> +    private final T parent;
> +    private Predicate<Exchange> predicate;
> +
> +    public PredicateClause(T parent) {
> +        this.parent = parent;
> +        this.predicate = null;
> +    }
> +
> +    @Override
> +    public boolean matches(Exchange exchange) {
> +        return (predicate != null) ?  predicate.test(exchange) : false;
> +    }
> +
> +    // *******************************
> +    // Exchange
> +    // *******************************
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T exchange(final Predicate<Exchange> predicate) {
> +        this.predicate = predicate::test;
> +        return parent;
> +    }
> +
> +
> +    // *******************************
> +    // Message
> +    // *******************************
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T message(final Predicate<Message> predicate) {
> +        return exchange(e -> predicate.test(e.getIn()));
> +    }
> +
> +    // *******************************
> +    // Body
> +    // *******************************
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T body(final Predicate<Object> predicate) {
> +        return exchange(e -> predicate.test(e.getIn().getBody()));
> +    }
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public <B> T body(final Class<B> type, final Predicate<B> predicate) {
> +        return exchange(e -> predicate.test(e.getIn().getBody(type)));
> +    }
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public T body(final BiPredicate<Object, Map<String, Object>> predicate) {
> +        return exchange(e -> predicate.test(
> +            e.getIn().getBody(),
> +            e.getIn().getHeaders())
> +        );
> +    }
> +
> +    /**
> +     * TODO: document
> +     *
> +     * Note: this is experimental and subject to changes in future releases.
> +     */
> +    public <B> T body(final Class<B> type, final BiPredicate<B, Map<String, Object>> predicate) {
> +        return exchange(e -> predicate.test(
> +            e.getIn().getBody(type),
> +            e.getIn().getHeaders())
> +        );
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> index e14118e..2b5b97c 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> @@ -32,7 +32,9 @@ import org.apache.camel.CamelContextAware;
>  import org.apache.camel.Expression;
>  import org.apache.camel.Predicate;
>  import org.apache.camel.Processor;
> +import org.apache.camel.builder.AggregationStrategyClause;
>  import org.apache.camel.builder.ExpressionClause;
> +import org.apache.camel.builder.PredicateClause;
>  import org.apache.camel.model.language.ExpressionDefinition;
>  import org.apache.camel.processor.CamelInternalProcessor;
>  import org.apache.camel.processor.aggregate.AggregateController;
> @@ -780,6 +782,28 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() {
> +        AggregationStrategyClause<AggregateDefinition> clause = new AggregationStrategyClause<>(this);
> +        setAggregationStrategy(clause);
> +        return clause;
> +    }
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregationStrategyClause<AggregateDefinition> strategy() {
> +        return aggregationStrategy();
> +    }
> +
> +    /**
>       * Sets the aggregate strategy to use
>       *
>       * @param aggregationStrategy  the aggregate strategy to use
> @@ -872,6 +896,28 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public PredicateClause<AggregateDefinition> completionPredicate() {
> +        PredicateClause<AggregateDefinition> clause = new PredicateClause<>(this);
> +        completionPredicate(clause);
> +        return clause;
> +    }
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public PredicateClause<AggregateDefinition> completion() {
> +        return completionPredicate();
> +    }
> +
> +    /**
>       * Indicates to complete all current aggregated exchanges when the context is stopped
>       */
>      public AggregateDefinition forceCompletionOnStop() {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> index 2ce1fc6..3c04f4b 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> @@ -46,6 +46,7 @@ import org.apache.camel.Predicate;
>  import org.apache.camel.Processor;
>  import org.apache.camel.Route;
>  import org.apache.camel.builder.DataFormatClause;
> +import org.apache.camel.builder.EnrichClause;
>  import org.apache.camel.builder.ExpressionBuilder;
>  import org.apache.camel.builder.ExpressionClause;
>  import org.apache.camel.builder.ProcessClause;
> @@ -3319,6 +3320,42 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public EnrichClause<ProcessorDefinition<Type>> enrichWith(String resourceUri) {
> +        EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this);
> +        enrich(resourceUri, clause);
> +        return clause;
> +    }
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public EnrichClause<ProcessorDefinition<Type>> enrichWith(String resourceUri, boolean aggregateOnException) {
> +        EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this);
> +        enrich(resourceUri, clause, aggregateOnException, false);
> +        return clause;
> +    }
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public EnrichClause<ProcessorDefinition<Type>> enrichWith(String resourceUri, boolean aggregateOnException, boolean shareUnitOfWork) {
> +        EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this);
> +        enrich(resourceUri, clause, aggregateOnException, shareUnitOfWork);
> +        return clause;
> +    }
> +
> +    /**
>       * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
>       * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
>       *
> @@ -3525,6 +3562,43 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>          return pollEnrich(resourceUri, timeout, aggregationStrategyRef, false);
>      }
>
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String resourceUri) {
> +        EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this);
> +        pollEnrich(resourceUri, -1, clause, false);
> +        return clause;
> +    }
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String resourceUri, long timeout) {
> +        EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this);
> +        pollEnrich(resourceUri, timeout, clause, false);
> +        return clause;
> +    }
> +
> +    /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String resourceUri, long timeout, boolean aggregateOnException) {
> +        EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this);
> +        pollEnrich(resourceUri, timeout, clause, aggregateOnException);
> +        return clause;
> +    }
> +
>      /**
>       * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
>       * enriches an exchange with additional data obtained from a <code>resourceUri</code>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> new file mode 100644
> index 0000000..55fd14e
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> @@ -0,0 +1,57 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor.aggregator;
> +
> +import java.util.Objects;
> +import java.util.stream.Collectors;
> +import java.util.stream.Stream;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class AggregateDslTest extends ContextTestSupport {
> +
> +    public void testAggregate() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:aggregated");
> +        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
> +
> +        for (int i = 0; i < 9; i++) {
> +            template.sendBodyAndHeader("direct:start", i, "type", i % 3);
> +        }
> +
> +        mock.assertIsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("direct:start")
> +                    .aggregate()
> +                        .message(m -> m.getHeader("type"))
> +                        .strategy()
> +                            .body(String.class, (o, n) ->  Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
> +                        .completion()
> +                            .body(String.class, s -> s.length() == 5)
> +                                    .to("mock:aggregated");
> +            }
> +        };
> +    }
> +}
> +
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
> new file mode 100644
> index 0000000..ca7d8fd
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
> @@ -0,0 +1,53 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor.enricher;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class EnricherDslTest extends ContextTestSupport {
> +
> +    public void testEnrich() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:enriched");
> +        mock.expectedBodiesReceived("res-1", "res-2", "res-3");
> +
> +        template.sendBody("direct:start", "1");
> +        template.sendBody("direct:start", "2");
> +        template.sendBody("direct:start", "3");
> +
> +        mock.assertIsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("direct:start")
> +                    .enrichWith("direct:resource")
> +                        .body(String.class, (o, n) -> n + o)
> +                    .to("mock:enriched");
> +
> +                // set an empty message
> +                from("direct:resource")
> +                    .transform()
> +                        .body(b -> "res-");
> +            }
> +        };
> +    }
> +}
>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2