You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/13 14:37:52 UTC

[01/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic uris.

Repository: camel
Updated Branches:
  refs/heads/master 71b43bc83 -> fe5960aef


CAMEL-4596: pollEnrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9fd4d549
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9fd4d549
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9fd4d549

Branch: refs/heads/master
Commit: 9fd4d549056c12754c0fa76e253e00580e8ceb7a
Parents: 71b43bc
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jul 12 20:54:52 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 08:14:47 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/ConsumerCache.java    |  1 -
 .../camel/model/PollEnrichDefinition.java       | 62 ++++++++++---
 .../apache/camel/model/ProcessorDefinition.java | 31 +++++++
 .../apache/camel/processor/PollEnricher.java    | 93 +++++++++++++++++---
 .../apache/camel/processor/SendProcessor.java   |  1 -
 .../enricher/PollEnrichExpressionTest.java      | 47 ++++++++++
 6 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
index 6f60f46..d957efe 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
@@ -58,7 +58,6 @@ public class ConsumerCache extends ServiceSupport {
         this(source, camelContext, cache, camelContext.getPollingConsumerServicePool());
     }
 
-
     public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) {
         this.camelContext = camelContext;
         this.consumers = cache;

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 7b6f9d1..a5d7cb1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,12 +19,16 @@ package org.apache.camel.model;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -41,6 +45,8 @@ import org.apache.camel.util.ObjectHelper;
 @XmlRootElement(name = "pollEnrich")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition {
+    @XmlElementRef
+    private ExpressionDefinition expression;
     @XmlAttribute(name = "uri")
     private String resourceUri;
     // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
@@ -94,24 +100,33 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) {
-            throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint");
+        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) {
+            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
         }
 
         // lookup endpoint
-        Endpoint endpoint;
+        PollingConsumer consumer = null;
         if (resourceUri != null) {
-            endpoint = routeContext.resolveEndpoint(resourceUri);
-        } else {
-            endpoint = routeContext.resolveEndpoint(null, resourceRef);
+            Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
+            consumer = endpoint.createPollingConsumer();
+        } else if (resourceRef != null) {
+            Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
+            consumer = endpoint.createPollingConsumer();
         }
 
+        // if no timeout then we should block, and there use a negative timeout
+        long time = timeout != null ? timeout : -1;
+
+        // create the expression if any was configured
+        Expression exp = createResourceExpression(routeContext);
+
         PollEnricher enricher;
-        if (timeout != null) {
-            enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout);
+        if (exp != null) {
+            enricher = new PollEnricher(null, exp, time);
+        } else if (consumer != null) {
+            enricher = new PollEnricher(null, consumer, time);
         } else {
-            // if no timeout then we should block, and there use a negative timeout
-            enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1);
+            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
         }
 
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
@@ -152,6 +167,20 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return strategy;
     }
 
+    /**
+     * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from.
+     *
+     * @param routeContext  the route context
+     * @return the created expression, or <tt>null</tt> if no expression configured
+     */
+    protected Expression createResourceExpression(RouteContext routeContext) {
+        if (expression != null) {
+            return expression.createExpression(routeContext);
+        } else {
+            return null;
+        }
+    }
+
     public String getResourceUri() {
         return resourceUri;
     }
@@ -257,4 +286,17 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
+
+    public ExpressionDefinition getExpression() {
+        return expression;
+    }
+
+    /**
+     * Sets an expression to use for dynamic computing the endpoint to poll from.
+     * <p/>
+     * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use.
+     */
+    public void setExpression(ExpressionDefinition expression) {
+        this.expression = expression;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index c84ea47..b525c0f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3453,6 +3453,37 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obtain the additional data, where as enrich uses a producer.
+     * <p/>
+     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+     * otherwise we use <tt>receive(timeout)</tt>.
+     *
+     * @param expression             to use an expression to dynamically compute the endpoint to poll from
+     * @param timeout                timeout in millis to wait at most for data to be available.
+     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
+     * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+     *                               an exception was thrown.
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(Expression expression, long timeout, String aggregationStrategyRef, boolean aggregateOnException) {
+        PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+        pollEnrich.setExpression(new ExpressionDefinition(expression));
+        pollEnrich.setTimeout(timeout);
+        pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
+        pollEnrich.setAggregateOnException(aggregateOnException);
+        addOutput(pollEnrich);
+        return (Type) this;
+    }
+
+    /**
      * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
      * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
      * The hook invoke callbacks for either onComplete or onFailure.

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9cbca74..ab313fb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -18,11 +18,15 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.ConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
@@ -46,12 +50,15 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
  *
  * @see Enricher
  */
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
+    private CamelContext camelContext;
+    private ConsumerCache consumerCache;
     private String id;
     private AggregationStrategy aggregationStrategy;
-    private PollingConsumer consumer;
+    private final PollingConsumer consumer;
+    private final Expression expression;
     private long timeout;
     private boolean aggregateOnException;
 
@@ -77,9 +84,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
     public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
         this.aggregationStrategy = aggregationStrategy;
         this.consumer = consumer;
+        this.expression = null;
         this.timeout = timeout;
     }
 
+    /**
+     * Creates a new {@link PollEnricher}.
+     *
+     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
+     * @param expression expression to use to compute the endpoint to poll from.
+     * @param timeout timeout in millis
+     */
+    public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) {
+        this.aggregationStrategy = aggregationStrategy;
+        this.expression = expression;
+        this.consumer = null;
+        this.timeout = timeout;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public String getId() {
         return id;
     }
@@ -89,7 +119,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
     }
 
     public Endpoint getEndpoint() {
-        return consumer.getEndpoint();
+        return consumer != null ? consumer.getEndpoint() : null;
     }
 
     public AggregationStrategy getAggregationStrategy() {
@@ -162,17 +192,35 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
             return true;
         }
 
+        // which consumer to use
+        PollingConsumer target = consumer;
+        Endpoint endpoint = null;
+
+        // use dynamic endpoint so calculate the endpoint to use
+        if (expression != null) {
+            try {
+                Object recipient = expression.evaluate(exchange, Object.class);
+                endpoint = resolveEndpoint(exchange, recipient);
+                // acquire the consumer from the cache
+                target = consumerCache.acquirePollingConsumer(endpoint);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
         Exchange resourceExchange;
         try {
             if (timeout < 0) {
-                LOG.debug("Consumer receive: {}", consumer);
+                LOG.debug("Consumer receive: {}", target);
                 resourceExchange = consumer.receive();
             } else if (timeout == 0) {
-                LOG.debug("Consumer receiveNoWait: {}", consumer);
-                resourceExchange = consumer.receiveNoWait();
+                LOG.debug("Consumer receiveNoWait: {}", target);
+                resourceExchange = target.receiveNoWait();
             } else {
-                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
-                resourceExchange = consumer.receive(timeout);
+                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
+                resourceExchange = target.receive(timeout);
             }
 
             if (resourceExchange == null) {
@@ -184,6 +232,11 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
             exchange.setException(new CamelExchangeException("Error during poll", exchange, e));
             callback.done(true);
             return true;
+        } finally {
+            // return the consumer back to the cache
+            if (expression != null) {
+                consumerCache.releasePollingConsumer(endpoint, target);
+            }
         }
 
         try {
@@ -209,9 +262,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
 
             // set header with the uri of the endpoint enriched so we can use that for tracing etc
             if (exchange.hasOut()) {
-                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
             } else {
-                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
             }
         } catch (Throwable e) {
             exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
@@ -223,6 +276,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
         return true;
     }
 
+    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String) {
+            recipient = ((String)recipient).trim();
+        }
+        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+    }
+
     /**
      * Strategy to pre check polling.
      * <p/>
@@ -251,11 +312,19 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(aggregationStrategy, consumer);
+        if (expression != null && consumerCache == null) {
+            // create consumer cache if we use dynamic expressions for computing the endpoints to poll
+            consumerCache = new ConsumerCache(this, getCamelContext());
+        }
+        ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumer, aggregationStrategy);
+        ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+    }
+
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index 884f674..3ab90d6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -118,7 +118,6 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra
             return true;
         }
 
-
         // we should preserve existing MEP so remember old MEP
         // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
         final ExchangePattern existingPattern = exchange.getPattern();

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
new file mode 100644
index 0000000..4e983a7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class PollEnrichExpressionTest extends ContextTestSupport {
+
+    public void testPollEnricExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody("seda:foo", "Hello World");
+        template.sendBody("seda:bar", "Bye World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .pollEnrich(header("source"), 1000, null, false)
+                    .to("mock:result");
+            }
+        };
+    }
+}


[03/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: pollEnrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b4af69c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b4af69c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b4af69c

Branch: refs/heads/master
Commit: 1b4af69c1bb2b03057feffc4ddd4e84baa2fba0b
Parents: b5be4d6
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 10:42:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 10:42:20 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/DefaultCamelContext.java  |  1 +
 .../apache/camel/impl/EmptyConsumerCache.java   | 76 ++++++++++++++++++++
 .../camel/model/PollEnrichDefinition.java       | 24 +++++++
 .../apache/camel/processor/PollEnricher.java    | 21 +++++-
 .../processor/RecipientListNoCacheTest.java     |  2 +-
 .../PollEnrichExpressionNoCacheTest.java        | 47 ++++++++++++
 .../enricher/PollEnrichExpressionTest.java      |  6 +-
 .../camel/spring/processor/pollEnricherRef.xml  |  8 +--
 8 files changed, 177 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7056932..c24674e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -2720,6 +2720,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         // special for executorServiceManager as want to stop it manually
         doAddService(executorServiceManager, false);
         addService(producerServicePool);
+        addService(pollingConsumerServicePool);
         addService(inflightRepository);
         addService(asyncProcessorAwaitManager);
         addService(shutdownStrategy);

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
new file mode 100644
index 0000000..219371a
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.IsSingleton;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link ConsumerCache} which is always empty and does not cache any {@link org.apache.camel.Consumer}s.
+ */
+public class EmptyConsumerCache extends ConsumerCache {
+
+    public EmptyConsumerCache(Object source, CamelContext camelContext) {
+        super(source, camelContext, 0);
+    }
+
+    @Override
+    public PollingConsumer acquirePollingConsumer(Endpoint endpoint) {
+        // always create a new consumer
+        PollingConsumer answer;
+        try {
+            answer = endpoint.createPollingConsumer();
+            boolean singleton = true;
+            if (answer instanceof IsSingleton) {
+                singleton = ((IsSingleton) answer).isSingleton();
+            }
+            if (getCamelContext().isStartingRoutes() && singleton) {
+                // if we are currently starting a route, then add as service and enlist in JMX
+                // - but do not enlist non-singletons in JMX
+                // - note addService will also start the service
+                getCamelContext().addService(answer);
+            } else {
+                // must then start service so producer is ready to be used
+                ServiceHelper.startService(answer);
+            }
+        } catch (Exception e) {
+            throw new FailedToCreateConsumerException(endpoint, e);
+        }
+        return answer;
+    }
+
+    @Override
+    public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) {
+        // stop and shutdown the consumer as its not cache or reused
+        try {
+            ServiceHelper.stopAndShutdownService(pollingConsumer);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "EmptyConsumerCache for source: " + getSource();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index eb1247b..18801d4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -52,6 +52,8 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
     private Boolean aggregateOnException;
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public PollEnrichDefinition() {
     }
@@ -89,6 +91,9 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
         if (getAggregateOnException() != null) {
             enricher.setAggregateOnException(getAggregateOnException());
         }
+        if (getCacheSize() != null) {
+            enricher.setCacheSize(getCacheSize());
+        }
 
         return enricher;
     }
@@ -186,6 +191,18 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
+     * to cache and reuse consumers when using this pollEnrich, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public PollEnrichDefinition cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
 
@@ -237,4 +254,11 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
         this.aggregateOnException = aggregateOnException;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9873cbb..de3c7b4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.EmptyConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
@@ -59,6 +60,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
     private final Expression expression;
     private long timeout;
     private boolean aggregateOnException;
+    private int cacheSize;
 
     /**
      * Creates a new {@link PollEnricher}.
@@ -131,6 +133,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
         this.aggregationStrategy = defaultAggregationStrategy();
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorHelper.process(this, exchange);
     }
@@ -275,7 +285,16 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
     protected void doStart() throws Exception {
         if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for computing the endpoints to poll
-            consumerCache = new ConsumerCache(this, getCamelContext());
+            if (cacheSize < 0) {
+                consumerCache = new EmptyConsumerCache(this, camelContext);
+                LOG.debug("PollEnrich {} is not using ConsumerCache", this);
+            } else if (cacheSize == 0) {
+                consumerCache = new ConsumerCache(this, camelContext);
+                LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this);
+            } else {
+                consumerCache = new ConsumerCache(this, camelContext, cacheSize);
+                LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startServices(consumerCache, aggregationStrategy);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index 122e72b..dd4b699 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -49,7 +49,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:a").recipientList(
-                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+                        header("recipientListHeader").tokenize(",")).cacheSize(-1);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
new file mode 100644
index 0000000..5c125d6
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class PollEnrichExpressionNoCacheTest extends ContextTestSupport {
+
+    public void testPollEnricExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody("seda:foo", "Hello World");
+        template.sendBody("seda:bar", "Bye World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .pollEnrich().header("source").cacheSize(-1)
+                    .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 38a42ab..c14de38 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -21,14 +21,16 @@ import org.apache.camel.builder.RouteBuilder;
 
 public class PollEnrichExpressionTest extends ContextTestSupport {
 
-    public void testPollEnricExpression() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+    public void testPollEnrichExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hi World");
 
         template.sendBody("seda:foo", "Hello World");
         template.sendBody("seda:bar", "Bye World");
+        template.sendBody("seda:foo", "Hi World");
 
         template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
         template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
 
         assertMockEndpointsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index c0e0c0b..3703be4 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -33,7 +33,7 @@
     <route>
       <from uri="direct:enricher-test-1"/>
       <pollEnrich strategyRef="sampleAggregator">
-        <ref>foo1</ref>
+        <simple>ref:foo1</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>
@@ -42,7 +42,7 @@
     <route>
       <from uri="direct:enricher-test-2"/>
       <pollEnrich timeout="1000" strategyRef="sampleAggregator">
-        <ref>foo2</ref>
+        <simple>ref:foo2</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>
@@ -50,7 +50,7 @@
     <route>
       <from uri="direct:enricher-test-3"/>
       <pollEnrich timeout="-1" strategyRef="sampleAggregator">
-        <ref>foo3</ref>
+        <simple>ref:foo3</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>
@@ -58,7 +58,7 @@
     <route>
       <from uri="direct:enricher-test-4"/>
       <pollEnrich strategyRef="sampleAggregator">
-        <ref>foo4</ref>
+        <simple>ref:foo4</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>


[05/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d918910
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d918910
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d918910

Branch: refs/heads/master
Commit: 2d9189100481e7ad305c600f014a3bf0d467bc3a
Parents: 5fdf8bc
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 11:35:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 11:35:42 2015 +0200

----------------------------------------------------------------------
 .../processor/SpringEnrichExpressionTest.java   | 30 ++++++++++++
 .../SpringEnricherAggregateOnExceptionTest.xml  |  8 +++-
 .../spring/processor/enrichExpressionTest.xml   | 50 ++++++++++++++++++++
 .../apache/camel/spring/processor/enricher.xml  |  4 +-
 .../camel/spring/processor/enricherref.xml      |  4 +-
 5 files changed, 92 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java
new file mode 100644
index 0000000..cd32b67
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.enricher.EnrichExpressionTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringEnrichExpressionTest extends EnrichExpressionTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/enrichExpressionTest.xml");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
index 8084d3a..3be128e 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
@@ -25,12 +25,16 @@
   <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
       <from uri="direct:start"/>
-      <enrich uri="direct:foo" strategyRef="myAggregator" aggregateOnException="true"/>
+      <enrich strategyRef="myAggregator" aggregateOnException="true">
+        <constant>direct:foo</constant>
+      </enrich>
       <to uri="mock:result"/>
     </route>
     <route>
       <from uri="direct:start2"/>
-      <enrich uri="direct:foo" strategyRef="myAggregator" aggregateOnException="false"/>
+      <enrich strategyRef="myAggregator" aggregateOnException="false">
+        <constant>direct:foo</constant>
+      </enrich>
       <to uri="mock:result"/>
     </route>
     <route>

http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml
new file mode 100644
index 0000000..91b7f42
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:start"/>
+      <enrich>
+        <header>source</header>
+      </enrich>
+      <to uri="mock:result"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+
+    <route>
+      <from uri="direct:foo"/>
+      <transform>
+        <constant>Hello World</constant>
+      </transform>
+    </route>
+    <route>
+      <from uri="direct:bar"/>
+      <transform>
+        <constant>Bye World</constant>
+      </transform>
+    </route>
+  </camelContext>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
index 6d03630..918f926 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
@@ -26,7 +26,9 @@
   <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
       <from uri="direct:start"/>
-      <enrich uri="direct:resource" strategyRef="sampleAggregator"/>
+      <enrich strategyRef="sampleAggregator">
+        <constant>direct:resource</constant>
+      </enrich>
       <to uri="mock:result"/>
     </route>
     <route>

http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
index 049d289..2825339 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
@@ -30,7 +30,9 @@
 
         <route>
             <from uri="direct:start"/>
-            <enrich ref="cool" strategyRef="sampleAggregator"/>
+            <enrich strategyRef="sampleAggregator">
+              <simple>ref:cool</simple>
+            </enrich>
             <to uri="mock:result"/>
         </route>
         <route>


[04/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5fdf8bc5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5fdf8bc5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5fdf8bc5

Branch: refs/heads/master
Commit: 5fdf8bc5179326fec39f21508f6727db6f89e515
Parents: 1b4af69
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 11:09:30 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 11:09:30 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/model/EnrichDefinition.java    | 169 +++++++++----------
 .../camel/model/PollEnrichDefinition.java       |   2 -
 .../apache/camel/model/ProcessorDefinition.java |  93 +++++-----
 .../org/apache/camel/processor/Enricher.java    | 139 +++++++++------
 .../enricher/EnrichExpressionNoCacheTest.java   |  49 ++++++
 .../enricher/EnrichExpressionTest.java          |  49 ++++++
 6 files changed, 317 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 6f0e358..902524e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -23,14 +23,13 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
 
 /**
  * Enriches a message with data from a secondary resource
@@ -40,13 +39,7 @@ import org.apache.camel.util.ObjectHelper;
 @Metadata(label = "eip,transformation")
 @XmlRootElement(name = "enrich")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> implements EndpointRequiredDefinition {
-    @XmlAttribute(name = "uri")
-    private String resourceUri;
-    // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
-    @XmlAttribute(name = "ref")
-    @Deprecated
-    private String resourceRef;
+public class EnrichDefinition extends NoOutputExpressionNode {
     @XmlAttribute(name = "strategyRef")
     private String aggregationStrategyRef;
     @XmlAttribute(name = "strategyMethodName")
@@ -59,67 +52,43 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
     private AggregationStrategy aggregationStrategy;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public EnrichDefinition() {
-        this(null, null);
+        this(null);
     }
 
-    public EnrichDefinition(String resourceUri) {
-        this(null, resourceUri);
-    }
-    
-    public EnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri) {
+    public EnrichDefinition(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
-        this.resourceUri = resourceUri;
     }
     
     @Override
     public String toString() {
-        return "Enrich[" + description() + " " + aggregationStrategy + "]";
-    }
-    
-    protected String description() {
-        return FromDefinition.description(resourceUri, resourceRef, (Endpoint) null);
+        return "Enrich[" + getExpression() + "]";
     }
     
     @Override
     public String getLabel() {
-        return "enrich[" + description() + "]";
-    }
-
-    @Override
-    public String getEndpointUri() {
-        if (resourceUri != null) {
-            return resourceUri;
-        } else {
-            return null;
-        }
+        return "enrich[" + getExpression() + "]";
     }
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) {
-            throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint");
-        }
 
         // lookup endpoint
-        Endpoint endpoint;
-        if (resourceUri != null) {
-            endpoint = routeContext.resolveEndpoint(resourceUri);
-        } else {
-            endpoint = routeContext.resolveEndpoint(null, resourceRef);
-        }
+
+        Expression exp = getExpression().createExpression(routeContext);
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
 
-        Enricher enricher = new Enricher(null, endpoint.createProducer(), isShareUnitOfWork);
+        Enricher enricher = new Enricher(exp);
+        enricher.setShareUnitOfWork(isShareUnitOfWork);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
-        if (strategy == null) {
-            enricher.setDefaultAggregationStrategy();
-        } else {
+        if (strategy != null) {
             enricher.setAggregationStrategy(strategy);
         }
-        if (getAggregateOnException() != null) {
-            enricher.setAggregateOnException(getAggregateOnException());
+        if (aggregateOnException != null) {
+            enricher.setAggregateOnException(aggregateOnException);
         }
         return enricher;
     }
@@ -149,39 +118,85 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         return strategy;
     }
 
-    public String getResourceUri() {
-        return resourceUri;
+    // Fluent API
+    // -------------------------------------------------------------------------
+
+    /**
+     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
+     */
+    public EnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
+    }
+
+    /**
+     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
+     */
+    public EnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+        setAggregationStrategyRef(aggregationStrategyRef);
+        return this;
     }
 
     /**
-     * The endpoint uri for the external service to enrich from. You must use either uri or ref.
+     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
      */
-    public void setResourceUri(String resourceUri) {
-        this.resourceUri = resourceUri;
+    public EnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
+        setAggregationStrategyMethodName(aggregationStrategyMethodName);
+        return this;
     }
 
-    public String getResourceRef() {
-        return resourceRef;
+    /**
+     * If this option is false then the aggregate method is not used if there was no data to enrich.
+     * If this option is true then null values is used as the oldExchange (when no data to enrich),
+     * when using POJOs as the AggregationStrategy.
+     */
+    public EnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
+        setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+        return this;
     }
 
     /**
-     * Refers to the endpoint for the external service to enrich from. You must use either uri or ref.
+     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
+     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
+     * to do if there was an exception in the aggregate method. For example to suppress the exception
+     * or set a custom message body etc.
+     */
+    public EnrichDefinition aggregateOnException(boolean aggregateOnException) {
+        setAggregateOnException(aggregateOnException);
+        return this;
+    }
+
+    /**
+     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange.
+     * Enrich will by default not share unit of work between the parent exchange and the resource exchange.
+     * This means the resource exchange has its own individual unit of work.
+     */
+    public EnrichDefinition shareUnitOfWork() {
+        setShareUnitOfWork(true);
+        return this;
+    }
+
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
+     * to cache and reuse consumers when using this pollEnrich, when uris are reused.
      *
-     * @deprecated use uri with ref:uri instead
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
      */
-    @Deprecated
-    public void setResourceRef(String resourceRef) {
-        this.resourceRef = resourceRef;
+    public EnrichDefinition cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
     }
 
+    // Properties
+    // -------------------------------------------------------------------------
+
     public String getAggregationStrategyRef() {
         return aggregationStrategyRef;
     }
 
-    /**
-     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as outgoing message.
-     */
     public void setAggregationStrategyRef(String aggregationStrategyRef) {
         this.aggregationStrategyRef = aggregationStrategyRef;
     }
@@ -190,9 +205,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         return aggregationStrategyMethodName;
     }
 
-    /**
-     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
         this.aggregationStrategyMethodName = aggregationStrategyMethodName;
     }
@@ -201,11 +213,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         return aggregationStrategyMethodAllowNull;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there was no data to enrich.
-     * If this option is true then null values is used as the oldExchange (when no data to enrich),
-     * when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
         this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
     }
@@ -214,10 +221,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         return aggregationStrategy;
     }
 
-    /**
-     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as outgoing message.
-     */
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -226,12 +229,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         return aggregateOnException;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
-     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
-     * to do if there was an exception in the aggregate method. For example to suppress the exception
-     * or set a custom message body etc.
-     */
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
@@ -240,13 +237,15 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         return shareUnitOfWork;
     }
 
-    /**
-     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange.
-     * Enrich will by default not share unit of work between the parent exchange and the resource exchange.
-     * This means the resource exchange has its own individual unit of work.
-     */
     public void setShareUnitOfWork(Boolean shareUnitOfWork) {
         this.shareUnitOfWork = shareUnitOfWork;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 18801d4..e7c84a6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -126,8 +126,6 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
     // Fluent API
     // -------------------------------------------------------------------------
 
-    // TODO: add cacheSize option
-
     /**
      * Timeout in millis when polling from the external service.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 589df86..920d85a 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3136,16 +3136,29 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     /**
      * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
      * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+     * <p/>
+     * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
+     * to obatin the additional data, where as pollEnrich uses a polling consumer.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @return the builder
+     * @see org.apache.camel.processor.Enricher
+     */
+    public Type enrich(String resourceUri) {
+        return enrich(resourceUri, null);
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
      * 
      * @param resourceUri           URI of resource endpoint for obtaining additional data.
      * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
     public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) {
-        addOutput(new EnrichDefinition(aggregationStrategy, resourceUri));
-        return (Type) this;
+        return enrich(resourceUri, aggregationStrategy, false);
     }
 
     /**
@@ -3159,12 +3172,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
     public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
-        EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri);
-        enrich.setAggregateOnException(aggregateOnException);
-        addOutput(enrich);
-        return (Type) this;
+        return enrich(resourceUri, aggregationStrategy, false, false);
     }
 
     /**
@@ -3181,10 +3190,12 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) {
-        EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri);
-        enrich.setAggregateOnException(aggregateOnException);
-        enrich.setShareUnitOfWork(shareUnitOfWork);
-        addOutput(enrich);
+        EnrichDefinition answer = new EnrichDefinition();
+        answer.setExpression(new ConstantExpression(resourceUri));
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setAggregateOnException(aggregateOnException);
+        answer.setShareUnitOfWork(shareUnitOfWork);
+        addOutput(answer);
         return (Type) this;
     }
 
@@ -3193,16 +3204,15 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
      * <p/>
      * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
-     * to obatin the additional data, where as pollEnrich uses a polling consumer.
+     * to obtain the additional data, where as pollEnrich uses a polling consumer.
      *
-     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @param resourceRef            Reference of resource endpoint for obtaining additional data.
+     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
-    public Type enrich(String resourceUri) {
-        addOutput(new EnrichDefinition(resourceUri));
-        return (Type) this;
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
+        return enrichRef(resourceRef, aggregationStrategyRef, false);
     }
 
     /**
@@ -3214,16 +3224,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      *
      * @param resourceRef            Reference of resource endpoint for obtaining additional data.
      * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
+     * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+     *                               an exception was thrown.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
-    public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
-        EnrichDefinition enrich = new EnrichDefinition();
-        enrich.setResourceRef(resourceRef);
-        enrich.setAggregationStrategyRef(aggregationStrategyRef);
-        addOutput(enrich);
-        return (Type) this;
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) {
+        return enrichRef(resourceRef, aggregationStrategyRef, false, false);
     }
 
     /**
@@ -3237,16 +3244,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
      * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
      *                               an exception was thrown.
+     * @param shareUnitOfWork        whether to share unit of work
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
     @SuppressWarnings("unchecked")
-    public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) {
-        EnrichDefinition enrich = new EnrichDefinition();
-        enrich.setResourceRef(resourceRef);
-        enrich.setAggregationStrategyRef(aggregationStrategyRef);
-        enrich.setAggregateOnException(aggregateOnException);
-        addOutput(enrich);
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) {
+        EnrichDefinition answer = new EnrichDefinition();
+        answer.setExpression(new SimpleExpression("ref:" + resourceRef));
+        answer.setAggregationStrategyRef(aggregationStrategyRef);
+        answer.setAggregateOnException(aggregateOnException);
+        answer.setShareUnitOfWork(shareUnitOfWork);
+        addOutput(answer);
         return (Type) this;
     }
 
@@ -3257,23 +3266,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
      * to obtain the additional data, where as pollEnrich uses a polling consumer.
      *
-     * @param resourceRef            Reference of resource endpoint for obtaining additional data.
-     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
-     * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
-     *                               an exception was thrown.
-     * @param shareUnitOfWork        whether to share unit of work
-     * @return the builder
-     * @see org.apache.camel.processor.Enricher
+     * @return a expression builder clause to set the expression to use for computing the endpoint to use
+     * @see org.apache.camel.processor.PollEnricher
      */
-    @SuppressWarnings("unchecked")
-    public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) {
-        EnrichDefinition enrich = new EnrichDefinition();
-        enrich.setResourceRef(resourceRef);
-        enrich.setAggregationStrategyRef(aggregationStrategyRef);
-        enrich.setAggregateOnException(aggregateOnException);
-        enrich.setShareUnitOfWork(shareUnitOfWork);
-        addOutput(enrich);
-        return (Type) this;
+    public ExpressionClause<EnrichDefinition> enrich() {
+        EnrichDefinition answer = new EnrichDefinition();
+        addOutput(answer);
+        return ExpressionClause.createAndSetExpression(answer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index b0532ba..cbdf104 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -18,18 +18,19 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
+import org.apache.camel.Expression;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
+import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -39,6 +40,7 @@ import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
 /**
@@ -53,38 +55,28 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
  *
  * @see PollEnricher
  */
-public class Enricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware {
+public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(Enricher.class);
+    private CamelContext camelContext;
     private String id;
+    private ProducerCache producerCache;
+    private final Expression expression;
     private AggregationStrategy aggregationStrategy;
-    private Producer producer;
     private boolean aggregateOnException;
     private boolean shareUnitOfWork;
+    private int cacheSize;
 
-    /**
-     * Creates a new {@link Enricher}. The default aggregation strategy is to
-     * copy the additional data obtained from the enricher's resource over the
-     * input data. When using the copy aggregation strategy the enricher
-     * degenerates to a normal transformer.
-     * 
-     * @param producer producer to resource endpoint.
-     */
-    public Enricher(Producer producer) {
-        this(defaultAggregationStrategy(), producer, false);
+    public Enricher(Expression expression) {
+        this.expression = expression;
     }
 
-    /**
-     * Creates a new {@link Enricher}.
-     * 
-     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
-     * @param producer producer to resource endpoint.
-     * @param shareUnitOfWork whether to share unit of work
-     */
-    public Enricher(AggregationStrategy aggregationStrategy, Producer producer, boolean shareUnitOfWork) {
-        this.aggregationStrategy = aggregationStrategy;
-        this.producer = producer;
-        this.shareUnitOfWork = shareUnitOfWork;
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
     }
 
     public String getId() {
@@ -95,11 +87,6 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
         this.id = id;
     }
 
-    /**
-     * Sets the aggregation strategy for this enricher.
-     *
-     * @param aggregationStrategy the aggregationStrategy to set
-     */
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -112,23 +99,24 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
         return aggregateOnException;
     }
 
-    /**
-     * Whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
-     * an exception was thrown.
-     */
     public void setAggregateOnException(boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
 
-    /**
-     * Sets the default aggregation strategy for this enricher.
-     */
-    public void setDefaultAggregationStrategy() {
-        this.aggregationStrategy = defaultAggregationStrategy();
+    public boolean isShareUnitOfWork() {
+        return shareUnitOfWork;
+    }
+
+    public void setShareUnitOfWork(boolean shareUnitOfWork) {
+        this.shareUnitOfWork = shareUnitOfWork;
     }
 
-    public Endpoint getEndpoint() {
-        return producer.getEndpoint();
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -148,6 +136,22 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
      * @param exchange input data.
      */
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        // which producer to use
+        final Producer producer;
+        final Endpoint endpoint;
+
+        // use dynamic endpoint so calculate the endpoint to use
+        try {
+            Object recipient = expression.evaluate(exchange, Object.class);
+            endpoint = resolveEndpoint(exchange, recipient);
+            // acquire the consumer from the cache
+            producer = producerCache.acquireProducer(endpoint);
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
         final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
         final Endpoint destination = producer.getEndpoint();
 
@@ -192,6 +196,13 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
                 // set property with the uri of the endpoint enriched so we can use that for tracing etc
                 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
 
+                // return the producer back to the cache
+                try {
+                    producerCache.releaseProducer(endpoint, producer);
+                } catch (Exception e) {
+                    // ignore
+                }
+
                 callback.done(false);
             }
         });
@@ -236,10 +247,25 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
         // set property with the uri of the endpoint enriched so we can use that for tracing etc
         exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
 
+        // return the producer back to the cache
+        try {
+            producerCache.releaseProducer(endpoint, producer);
+        } catch (Exception e) {
+            // ignore
+        }
+
         callback.done(true);
         return true;
     }
 
+    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String) {
+            recipient = ((String)recipient).trim();
+        }
+        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+    }
+
     /**
      * Creates a new {@link DefaultExchange} instance from the given
      * <code>exchange</code>. The resulting exchange's pattern is defined by
@@ -273,21 +299,34 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
         return new CopyAggregationStrategy();
     }
 
-    public boolean isShareUnitOfWork() {
-        return shareUnitOfWork;
-    }
-
     @Override
     public String toString() {
-        return "Enrich[" + producer.getEndpoint() + "]";
+        return "Enrich[" + expression + "]";
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(aggregationStrategy, producer);
+        if (aggregationStrategy == null) {
+            aggregationStrategy = defaultAggregationStrategy();
+        }
+
+        if (producerCache == null) {
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                LOG.debug("Enricher {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                LOG.debug("Enricher {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
+        }
+
+        ServiceHelper.startServices(producerCache, aggregationStrategy);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(producer, aggregationStrategy);
+        ServiceHelper.stopServices(aggregationStrategy, producerCache);
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
new file mode 100644
index 0000000..12d4bd7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class EnrichExpressionNoCacheTest extends ContextTestSupport {
+
+    public void testEnrichExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hello World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", "direct:bar");
+        template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .enrich().header("source").cacheSize(-1)
+                    .to("mock:result");
+
+                from("direct:foo").transform().constant("Hello World");
+
+                from("direct:bar").transform().constant("Bye World");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
new file mode 100644
index 0000000..0248884
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class EnrichExpressionTest extends ContextTestSupport {
+
+    public void testEnrichExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hello World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", "direct:bar");
+        template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .enrich().header("source")
+                    .to("mock:result");
+
+                from("direct:foo").transform().constant("Hello World");
+
+                from("direct:bar").transform().constant("Bye World");
+            }
+        };
+    }
+}


[02/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: pollEnrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5be4d6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5be4d6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5be4d6e

Branch: refs/heads/master
Commit: b5be4d6ed829700ca9af852940dcf28f3c00b598
Parents: 9fd4d54
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 09:34:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 10:06:13 2015 +0200

----------------------------------------------------------------------
 .../camel/model/PollEnrichDefinition.java       | 184 ++++++-------------
 .../apache/camel/model/ProcessorDefinition.java |  43 +++--
 .../apache/camel/processor/PollEnricher.java    |  91 +++------
 .../enricher/PollEnrichExpressionTest.java      |   2 +-
 .../SpringPollEnrichExpressionTest.java         |  30 +++
 .../processor/pollEnrichExpressionTest.xml      |  37 ++++
 .../camel/spring/processor/pollEnricher.xml     |  66 ++++---
 .../camel/spring/processor/pollEnricherRef.xml  |  78 ++++----
 8 files changed, 267 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index a5d7cb1..eb1247b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,22 +19,17 @@ package org.apache.camel.model;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Expression;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
 
 /**
  * Enriches messages with data polled from a secondary resource
@@ -44,15 +39,7 @@ import org.apache.camel.util.ObjectHelper;
 @Metadata(label = "eip,transformation")
 @XmlRootElement(name = "pollEnrich")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition {
-    @XmlElementRef
-    private ExpressionDefinition expression;
-    @XmlAttribute(name = "uri")
-    private String resourceUri;
-    // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
-    @XmlAttribute(name = "ref")
-    @Deprecated
-    private String resourceRef;
+public class PollEnrichDefinition extends NoOutputExpressionNode {
     @XmlAttribute @Metadata(defaultValue = "-1")
     private Long timeout;
     @XmlAttribute(name = "strategyRef")
@@ -69,65 +56,29 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
     public PollEnrichDefinition() {
     }
 
-    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) {
+    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) {
         this.aggregationStrategy = aggregationStrategy;
-        this.resourceUri = resourceUri;
         this.timeout = timeout;
     }
 
     @Override
     public String toString() {
-        return "PollEnrich[" + description() + " " + aggregationStrategy + "]";
+        return "PollEnrich[" + getExpression() + "]";
     }
     
-    protected String description() {
-        return FromDefinition.description(getResourceUri(), getResourceRef(), (Endpoint) null);
-    }
-
     @Override
     public String getLabel() {
-        return "pollEnrich[" + description() + "]";
-    }
-
-    @Override
-    public String getEndpointUri() {
-        if (resourceUri != null) {
-            return resourceUri;
-        } else {
-            return null;
-        }
+        return "pollEnrich[" + getExpression() + "]";
     }
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) {
-            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
-        }
-
-        // lookup endpoint
-        PollingConsumer consumer = null;
-        if (resourceUri != null) {
-            Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
-            consumer = endpoint.createPollingConsumer();
-        } else if (resourceRef != null) {
-            Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
-            consumer = endpoint.createPollingConsumer();
-        }
 
         // if no timeout then we should block, and there use a negative timeout
         long time = timeout != null ? timeout : -1;
+        Expression exp = getExpression().createExpression(routeContext);
 
-        // create the expression if any was configured
-        Expression exp = createResourceExpression(routeContext);
-
-        PollEnricher enricher;
-        if (exp != null) {
-            enricher = new PollEnricher(null, exp, time);
-        } else if (consumer != null) {
-            enricher = new PollEnricher(null, consumer, time);
-        } else {
-            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
-        }
+        PollEnricher enricher = new PollEnricher(exp, time);
 
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
         if (strategy == null) {
@@ -167,60 +118,81 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return strategy;
     }
 
+    // Fluent API
+    // -------------------------------------------------------------------------
+
+    // TODO: add cacheSize option
+
     /**
-     * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from.
-     *
-     * @param routeContext  the route context
-     * @return the created expression, or <tt>null</tt> if no expression configured
+     * Timeout in millis when polling from the external service.
+     * <p/>
+     * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
+     * <ul>
+     *     <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
+     *     <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
+     *     <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
+     * </ul>
+     * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
      */
-    protected Expression createResourceExpression(RouteContext routeContext) {
-        if (expression != null) {
-            return expression.createExpression(routeContext);
-        } else {
-            return null;
-        }
+    public PollEnrichDefinition timeout(long timeout) {
+        setTimeout(timeout);
+        return this;
     }
 
-    public String getResourceUri() {
-        return resourceUri;
+    /**
+     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
+     */
+    public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
     }
 
     /**
-     * The endpoint uri for the external service to poll enrich from. You must use either uri or ref.
+     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
      */
-    public void setResourceUri(String resourceUri) {
-        this.resourceUri = resourceUri;
+    public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+        setAggregationStrategyRef(aggregationStrategyRef);
+        return this;
     }
 
-    public String getResourceRef() {
-        return resourceRef;
+    /**
+     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
+     */
+    public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
+        setAggregationStrategyMethodName(aggregationStrategyMethodName);
+        return this;
+    }
+
+    /**
+     * If this option is false then the aggregate method is not used if there was no data to enrich.
+     * If this option is true then null values is used as the oldExchange (when no data to enrich),
+     * when using POJOs as the AggregationStrategy.
+     */
+    public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
+        setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+        return this;
     }
 
     /**
-     * Refers to the endpoint for the external service to poll enrich from. You must use either uri or ref.
-     *
-     * @deprecated use uri with ref:uri instead
+     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
+     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
+     * to do if there was an exception in the aggregate method. For example to suppress the exception
+     * or set a custom message body etc.
      */
-    @Deprecated
-    public void setResourceRef(String resourceRef) {
-        this.resourceRef = resourceRef;
+    public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) {
+        setAggregateOnException(aggregateOnException);
+        return this;
     }
 
+    // Properties
+    // -------------------------------------------------------------------------
+
     public Long getTimeout() {
         return timeout;
     }
 
-    /**
-     * Timeout in millis when polling from the external service.
-     * <p/>
-     * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
-     * <ul>
-     *     <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
-     *     <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
-     *     <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
-     * </ul>
-     * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
-     */
     public void setTimeout(Long timeout) {
         this.timeout = timeout;
     }
@@ -229,10 +201,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategyRef;
     }
 
-    /**
-     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as outgoing message.
-     */
     public void setAggregationStrategyRef(String aggregationStrategyRef) {
         this.aggregationStrategyRef = aggregationStrategyRef;
     }
@@ -241,9 +209,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategyMethodName;
     }
 
-    /**
-     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
         this.aggregationStrategyMethodName = aggregationStrategyMethodName;
     }
@@ -252,11 +217,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategyMethodAllowNull;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there was no data to enrich.
-     * If this option is true then null values is used as the oldExchange (when no data to enrich),
-     * when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
         this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
     }
@@ -265,10 +225,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategy;
     }
 
-    /**
-     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as outgoing message.
-     */
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -277,26 +233,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregateOnException;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
-     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
-     * to do if there was an exception in the aggregate method. For example to suppress the exception
-     * or set a custom message body etc.
-     */
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
 
-    public ExpressionDefinition getExpression() {
-        return expression;
-    }
-
-    /**
-     * Sets an expression to use for dynamic computing the endpoint to poll from.
-     * <p/>
-     * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use.
-     */
-    public void setExpression(ExpressionDefinition expression) {
-        this.expression = expression;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b525c0f..589df86 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -52,6 +52,7 @@ import org.apache.camel.builder.ProcessorBuilder;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
+import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
@@ -3292,8 +3293,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri) {
-        addOutput(new PollEnrichDefinition(null, resourceUri, -1));
-        return (Type) this;
+        return pollEnrich(resourceUri, null);
     }
 
     /**
@@ -3314,8 +3314,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
-        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1));
-        return (Type) this;
+        return pollEnrich(resourceUri, -1, aggregationStrategy);
     }
 
     /**
@@ -3338,8 +3337,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
-        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
-        return (Type) this;
+        return pollEnrich(resourceUri, timeout, aggregationStrategy, false);
     }
 
     /**
@@ -3364,7 +3362,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
-        PollEnrichDefinition pollEnrich = new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout);
+        PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+        pollEnrich.setExpression(new ConstantExpression(resourceUri));
+        pollEnrich.setTimeout(timeout);
+        pollEnrich.setAggregationStrategy(aggregationStrategy);
         pollEnrich.setAggregateOnException(aggregateOnException);
         addOutput(pollEnrich);
         return (Type) this;
@@ -3389,8 +3390,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout) {
-        addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
-        return (Type) this;
+        return pollEnrich(resourceUri, timeout, null);
     }
 
     /**
@@ -3414,7 +3414,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     @SuppressWarnings("unchecked")
     public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef) {
         PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
-        pollEnrich.setResourceRef(resourceRef);
+        pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
         pollEnrich.setTimeout(timeout);
         pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
         addOutput(pollEnrich);
@@ -3444,7 +3444,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     @SuppressWarnings("unchecked")
     public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef, boolean aggregateOnException) {
         PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
-        pollEnrich.setResourceRef(resourceRef);
+        pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
         pollEnrich.setTimeout(timeout);
         pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
         pollEnrich.setAggregateOnException(aggregateOnException);
@@ -3484,6 +3484,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obtain the additional data, where as enrich uses a producer.
+     * <p/>
+     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+     * otherwise we use <tt>receive(timeout)</tt>.
+     *
+     * @return a expression builder clause to set the expression to use for computing the endpoint to poll from
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    public ExpressionClause<PollEnrichDefinition> pollEnrich() {
+        PollEnrichDefinition answer = new PollEnrichDefinition();
+        addOutput(answer);
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
      * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
      * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
      * The hook invoke callbacks for either onComplete or onFailure.

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index ab313fb..9873cbb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -22,7 +22,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
@@ -50,55 +49,25 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
  *
  * @see Enricher
  */
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
     private CamelContext camelContext;
     private ConsumerCache consumerCache;
     private String id;
     private AggregationStrategy aggregationStrategy;
-    private final PollingConsumer consumer;
     private final Expression expression;
     private long timeout;
     private boolean aggregateOnException;
 
     /**
-     * Creates a new {@link PollEnricher}. The default aggregation strategy is to
-     * copy the additional data obtained from the enricher's resource over the
-     * input data. When using the copy aggregation strategy the enricher
-     * degenerates to a normal transformer.
-     *
-     * @param consumer consumer to resource endpoint.
-     */
-    public PollEnricher(PollingConsumer consumer) {
-        this(defaultAggregationStrategy(), consumer, 0);
-    }
-
-    /**
      * Creates a new {@link PollEnricher}.
      *
-     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
-     * @param consumer consumer to resource endpoint.
-     * @param timeout timeout in millis
-     */
-    public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
-        this.aggregationStrategy = aggregationStrategy;
-        this.consumer = consumer;
-        this.expression = null;
-        this.timeout = timeout;
-    }
-
-    /**
-     * Creates a new {@link PollEnricher}.
-     *
-     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
      * @param expression expression to use to compute the endpoint to poll from.
      * @param timeout timeout in millis
      */
-    public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) {
-        this.aggregationStrategy = aggregationStrategy;
+    public PollEnricher(Expression expression, long timeout) {
         this.expression = expression;
-        this.consumer = null;
         this.timeout = timeout;
     }
 
@@ -118,10 +87,6 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
         this.id = id;
     }
 
-    public Endpoint getEndpoint() {
-        return consumer != null ? consumer.getEndpoint() : null;
-    }
-
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }
@@ -193,34 +158,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
         }
 
         // which consumer to use
-        PollingConsumer target = consumer;
-        Endpoint endpoint = null;
+        PollingConsumer consumer;
+        Endpoint endpoint;
 
         // use dynamic endpoint so calculate the endpoint to use
-        if (expression != null) {
-            try {
-                Object recipient = expression.evaluate(exchange, Object.class);
-                endpoint = resolveEndpoint(exchange, recipient);
-                // acquire the consumer from the cache
-                target = consumerCache.acquirePollingConsumer(endpoint);
-            } catch (Throwable e) {
-                exchange.setException(e);
-                callback.done(true);
-                return true;
-            }
+        try {
+            Object recipient = expression.evaluate(exchange, Object.class);
+            endpoint = resolveEndpoint(exchange, recipient);
+            // acquire the consumer from the cache
+            consumer = consumerCache.acquirePollingConsumer(endpoint);
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
         Exchange resourceExchange;
         try {
             if (timeout < 0) {
-                LOG.debug("Consumer receive: {}", target);
+                LOG.debug("Consumer receive: {}", consumer);
                 resourceExchange = consumer.receive();
             } else if (timeout == 0) {
-                LOG.debug("Consumer receiveNoWait: {}", target);
-                resourceExchange = target.receiveNoWait();
+                LOG.debug("Consumer receiveNoWait: {}", consumer);
+                resourceExchange = consumer.receiveNoWait();
             } else {
-                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
-                resourceExchange = target.receive(timeout);
+                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
+                resourceExchange = consumer.receive(timeout);
             }
 
             if (resourceExchange == null) {
@@ -234,9 +197,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
             return true;
         } finally {
             // return the consumer back to the cache
-            if (expression != null) {
-                consumerCache.releasePollingConsumer(endpoint, target);
-            }
+            consumerCache.releasePollingConsumer(endpoint, consumer);
         }
 
         try {
@@ -262,9 +223,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
 
             // set header with the uri of the endpoint enriched so we can use that for tracing etc
             if (exchange.hasOut()) {
-                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
             } else {
-                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
             }
         } catch (Throwable e) {
             exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
@@ -308,23 +269,23 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
 
     @Override
     public String toString() {
-        return "PollEnrich[" + consumer + "]";
+        return "PollEnrich[" + expression + "]";
     }
 
     protected void doStart() throws Exception {
-        if (expression != null && consumerCache == null) {
+        if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for computing the endpoints to poll
             consumerCache = new ConsumerCache(this, getCamelContext());
         }
-        ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
+        ServiceHelper.startServices(consumerCache, aggregationStrategy);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+        ServiceHelper.stopServices(aggregationStrategy, consumerCache);
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
+        ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache);
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 4e983a7..38a42ab 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -39,7 +39,7 @@ public class PollEnrichExpressionTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .pollEnrich(header("source"), 1000, null, false)
+                    .pollEnrich().header("source")
                     .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
new file mode 100644
index 0000000..8017e8d
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.enricher.PollEnrichExpressionTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringPollEnrichExpressionTest extends PollEnrichExpressionTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnrichExpressionTest.xml");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
new file mode 100644
index 0000000..cb932e3
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:start"/>
+      <pollEnrich>
+        <header>source</header>
+      </pollEnrich>
+      <to uri="mock:result"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+  </camelContext>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
index f63b63a..f0f71ce 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
@@ -22,34 +22,42 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-    <camelContext xmlns="http://camel.apache.org/schema/spring">
-        <!-- START SNIPPET: e1 -->
-        <route>
-            <from uri="direct:enricher-test-1"/>
-            <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-        <!-- END SNIPPET: e1 -->
-
-        <route>
-            <from uri="direct:enricher-test-2"/>
-            <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-3"/>
-            <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-4"/>
-            <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-    </camelContext>
-
-    <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:enricher-test-1"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <constant>seda:foo1</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+
+    <route>
+      <from uri="direct:enricher-test-2"/>
+      <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+        <constant>seda:foo2</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-3"/>
+      <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+        <constant>seda:foo3</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-4"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <constant>seda:foo4</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+  </camelContext>
+
+  <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
 
 </beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index 6046e7d..c0e0c0b 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -22,40 +22,48 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-    <camelContext xmlns="http://camel.apache.org/schema/spring">
-
-        <endpoint id="foo1" uri="seda:foo1"/>
-        <endpoint id="foo2" uri="seda:foo2"/>
-        <endpoint id="foo3" uri="seda:foo3"/>
-        <endpoint id="foo4" uri="seda:foo4"/>
-
-        <!-- START SNIPPET: e1 -->
-        <route>
-            <from uri="direct:enricher-test-1"/>
-            <pollEnrich ref="foo1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-        <!-- END SNIPPET: e1 -->
-
-        <route>
-            <from uri="direct:enricher-test-2"/>
-            <pollEnrich ref="foo2" timeout="1000" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-3"/>
-            <pollEnrich ref="foo3" timeout="-1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-4"/>
-            <pollEnrich ref="foo4" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-    </camelContext>
-
-    <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+    <endpoint id="foo1" uri="seda:foo1"/>
+    <endpoint id="foo2" uri="seda:foo2"/>
+    <endpoint id="foo3" uri="seda:foo3"/>
+    <endpoint id="foo4" uri="seda:foo4"/>
+
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:enricher-test-1"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <ref>foo1</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+
+    <route>
+      <from uri="direct:enricher-test-2"/>
+      <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+        <ref>foo2</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-3"/>
+      <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+        <ref>foo3</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-4"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <ref>foo4</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+  </camelContext>
+
+  <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
 
 </beans>


[09/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe8bf8d6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe8bf8d6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe8bf8d6

Branch: refs/heads/master
Commit: fe8bf8d64c6c57775610ca0b8c7b87bb4efb601a
Parents: b07edd0
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:50:27 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:50:27 2015 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/model/ProcessorDefinition.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fe8bf8d6/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 920d85a..2359774 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3173,7 +3173,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @see org.apache.camel.processor.Enricher
      */
     public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
-        return enrich(resourceUri, aggregationStrategy, false, false);
+        return enrich(resourceUri, aggregationStrategy, aggregateOnException, false);
     }
 
     /**


[06/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/80dbbc40
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/80dbbc40
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/80dbbc40

Branch: refs/heads/master
Commit: 80dbbc4067f7afa767a62dc7daea92096c95c99a
Parents: 2d91891
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:09:56 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:09:56 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/component/cxf/GreeterEnrichRouterContext.xml    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/80dbbc40/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
index c6ff1c5..3773033 100644
--- a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
+++ b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
@@ -52,7 +52,9 @@
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
     <route errorHandlerRef="noErrorHandler">
       <from ref="routerEndpoint" />
-      <enrich ref="serviceEndpoint" />
+      <enrich>
+        <simple>ref:serviceEndpoint</simple> 
+      </enrich>
     </route>
    </camelContext>
 


[07/10] camel git commit: CAMEL-8956: Add failsafe until we have better namespace parsing with factory beans.

Posted by da...@apache.org.
CAMEL-8956: Add failsafe until we have better namespace parsing with factory beans.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39c51705
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39c51705
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39c51705

Branch: refs/heads/master
Commit: 39c5170589d1d15ccf6ca975f7cdbd046b54db87
Parents: 80dbbc4
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:10:28 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:10:28 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/cxf/CxfProducer.java    | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/39c51705/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
index 0aa4fa9..f778e5f 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
@@ -34,6 +34,7 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.cxf.Bus;
 import org.apache.cxf.binding.soap.model.SoapHeaderInfo;
 import org.apache.cxf.endpoint.Client;
@@ -72,6 +73,9 @@ public class CxfProducer extends DefaultProducer implements AsyncProcessor {
     
     @Override
     protected void doStart() throws Exception {
+        // failsafe as cxf may not ensure the endpoint is started (CAMEL-8956)
+        ServiceHelper.startService(endpoint);
+
         if (client == null) {
             client = endpoint.createClient();
         }


[10/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe5960ae
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe5960ae
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe5960ae

Branch: refs/heads/master
Commit: fe5960aef0d8040b651762b6d3423a610f87176d
Parents: fe8bf8d
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 14:19:06 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 14:19:06 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/scala/dsl/SEnrichDefinition.scala      | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fe5960ae/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
index 48a40f5..450fe84 100644
--- a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
+++ b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
@@ -27,7 +27,10 @@ case class SEnrichDefinition(override val target: EnrichDefinition)(implicit val
   
   def aggregationStrategy(strategy: AggregationStrategy) = wrap(target.setAggregationStrategy(strategy))
   def aggregationStrategyRef(ref: String) = wrap(target.setAggregationStrategyRef(ref))
-  
-  def resourceRef(ref: String) = wrap(target.setResourceRef(ref))
-  def resourceUri(resourceUri: String) = wrap(target.setResourceUri(resourceUri))
+  def aggregationStrategyMethodName(name: String) = wrap(target.setAggregationStrategyMethodName(name))
+  def aggregationStrategyMethodAllowNull(allowNull: Boolean) = wrap(target.setAggregationStrategyMethodAllowNull(allowNull))
+  def aggregateOnException(aggregateOnException: Boolean) = wrap(target.setAggregateOnException(aggregateOnException))
+  def shareUnitOfWork() = wrap(target.setShareUnitOfWork(true))
+  def cacheSize(size: Integer) = wrap(target.setCacheSize(size))
+
 }


[08/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.

Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b07edd0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b07edd0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b07edd0a

Branch: refs/heads/master
Commit: b07edd0a2f535ebb9d73bd31736ac838523e078b
Parents: 39c5170
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:48:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:48:20 2015 +0200

----------------------------------------------------------------------
 .../impl/MultipleLifecycleStrategyTest.java     |  2 +-
 ...roducerRouteAddRemoveRegisterAlwaysTest.java |  6 +--
 .../management/ManagedRouteAddRemoveTest.java   | 42 ++++++++++----------
 .../model/GatherAllStaticEndpointUrisTest.java  |  2 +-
 4 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index a3cf751..f39a1ba 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -50,7 +50,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport {
         context.removeComponent("log");
         context.stop();
 
-        List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
+        List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
                 "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
         
         assertEquals(expectedEvents, dummy1.getEvents());

http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index 86ec502..606be4b 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -47,7 +47,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -72,7 +72,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // but as its recipient list which is dynamic-to we add new producers because we have register always
         namesP = mbeanServer.queryNames(onP, null);
@@ -87,7 +87,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // and we still have the other producers, but not the one from the 2nd route that was removed
         namesP = mbeanServer.queryNames(onP, null);

http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index 46656de..ededf75 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -59,7 +59,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
         // number of services
         ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,*");
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -84,7 +84,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // but we should have one more producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -99,7 +99,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // and the 2nd producer should be removed
         namesP = mbeanServer.queryNames(onP, null);
@@ -119,7 +119,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -144,7 +144,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // but as its recipient list which is dynamic-to we do not add a new producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -159,7 +159,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // and we still have the original producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -179,7 +179,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -204,7 +204,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // but as its recipient list which is dynamic-to we do not add a new producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -219,7 +219,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // and we still have the original producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -239,7 +239,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Adding 2nd route");
 
@@ -269,7 +269,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -281,7 +281,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Shutting down...");
     }
@@ -297,7 +297,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Adding 2nd route");
 
@@ -328,7 +328,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -340,7 +340,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Shutting down...");
     }
@@ -356,7 +356,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Adding 2nd route");
 
@@ -385,7 +385,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -397,7 +397,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Shutting down...");
     }
@@ -413,7 +413,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Adding 2nd route");
 
@@ -443,7 +443,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -455,7 +455,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(9, names.size());
+        assertEquals(10, names.size());
 
         log.info("Shutting down...");
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java b/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
index 4713c61..f423c84 100644
--- a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
+++ b/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
@@ -27,7 +27,7 @@ public class GatherAllStaticEndpointUrisTest extends ContextTestSupport {
         RouteDefinition route = context.getRouteDefinition("foo");
         Set<String> uris = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route, true, true);
         assertNotNull(uris);
-        assertEquals(5, uris.size());
+        assertEquals(4, uris.size());
 
         RouteDefinition route2 = context.getRouteDefinition("bar");
         Set<String> uris2 = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route2, true, true);