You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/13 14:37:53 UTC

[02/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic uris.

CAMEL-4596: pollEnrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5be4d6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5be4d6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5be4d6e

Branch: refs/heads/master
Commit: b5be4d6ed829700ca9af852940dcf28f3c00b598
Parents: 9fd4d54
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 09:34:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 10:06:13 2015 +0200

----------------------------------------------------------------------
 .../camel/model/PollEnrichDefinition.java       | 184 ++++++-------------
 .../apache/camel/model/ProcessorDefinition.java |  43 +++--
 .../apache/camel/processor/PollEnricher.java    |  91 +++------
 .../enricher/PollEnrichExpressionTest.java      |   2 +-
 .../SpringPollEnrichExpressionTest.java         |  30 +++
 .../processor/pollEnrichExpressionTest.xml      |  37 ++++
 .../camel/spring/processor/pollEnricher.xml     |  66 ++++---
 .../camel/spring/processor/pollEnricherRef.xml  |  78 ++++----
 8 files changed, 267 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index a5d7cb1..eb1247b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,22 +19,17 @@ package org.apache.camel.model;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Expression;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
 
 /**
  * Enriches messages with data polled from a secondary resource
@@ -44,15 +39,7 @@ import org.apache.camel.util.ObjectHelper;
 @Metadata(label = "eip,transformation")
 @XmlRootElement(name = "pollEnrich")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition {
-    @XmlElementRef
-    private ExpressionDefinition expression;
-    @XmlAttribute(name = "uri")
-    private String resourceUri;
-    // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
-    @XmlAttribute(name = "ref")
-    @Deprecated
-    private String resourceRef;
+public class PollEnrichDefinition extends NoOutputExpressionNode {
     @XmlAttribute @Metadata(defaultValue = "-1")
     private Long timeout;
     @XmlAttribute(name = "strategyRef")
@@ -69,65 +56,29 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
     public PollEnrichDefinition() {
     }
 
-    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) {
+    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) {
         this.aggregationStrategy = aggregationStrategy;
-        this.resourceUri = resourceUri;
         this.timeout = timeout;
     }
 
     @Override
     public String toString() {
-        return "PollEnrich[" + description() + " " + aggregationStrategy + "]";
+        return "PollEnrich[" + getExpression() + "]";
     }
     
-    protected String description() {
-        return FromDefinition.description(getResourceUri(), getResourceRef(), (Endpoint) null);
-    }
-
     @Override
     public String getLabel() {
-        return "pollEnrich[" + description() + "]";
-    }
-
-    @Override
-    public String getEndpointUri() {
-        if (resourceUri != null) {
-            return resourceUri;
-        } else {
-            return null;
-        }
+        return "pollEnrich[" + getExpression() + "]";
     }
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) {
-            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
-        }
-
-        // lookup endpoint
-        PollingConsumer consumer = null;
-        if (resourceUri != null) {
-            Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
-            consumer = endpoint.createPollingConsumer();
-        } else if (resourceRef != null) {
-            Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
-            consumer = endpoint.createPollingConsumer();
-        }
 
         // if no timeout then we should block, and there use a negative timeout
         long time = timeout != null ? timeout : -1;
+        Expression exp = getExpression().createExpression(routeContext);
 
-        // create the expression if any was configured
-        Expression exp = createResourceExpression(routeContext);
-
-        PollEnricher enricher;
-        if (exp != null) {
-            enricher = new PollEnricher(null, exp, time);
-        } else if (consumer != null) {
-            enricher = new PollEnricher(null, consumer, time);
-        } else {
-            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
-        }
+        PollEnricher enricher = new PollEnricher(exp, time);
 
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
         if (strategy == null) {
@@ -167,60 +118,81 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return strategy;
     }
 
+    // Fluent API
+    // -------------------------------------------------------------------------
+
+    // TODO: add cacheSize option
+
     /**
-     * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from.
-     *
-     * @param routeContext  the route context
-     * @return the created expression, or <tt>null</tt> if no expression configured
+     * Timeout in millis when polling from the external service.
+     * <p/>
+     * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
+     * <ul>
+     *     <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
+     *     <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
+     *     <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
+     * </ul>
+     * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
      */
-    protected Expression createResourceExpression(RouteContext routeContext) {
-        if (expression != null) {
-            return expression.createExpression(routeContext);
-        } else {
-            return null;
-        }
+    public PollEnrichDefinition timeout(long timeout) {
+        setTimeout(timeout);
+        return this;
     }
 
-    public String getResourceUri() {
-        return resourceUri;
+    /**
+     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
+     */
+    public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
     }
 
     /**
-     * The endpoint uri for the external service to poll enrich from. You must use either uri or ref.
+     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as outgoing message.
      */
-    public void setResourceUri(String resourceUri) {
-        this.resourceUri = resourceUri;
+    public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+        setAggregationStrategyRef(aggregationStrategyRef);
+        return this;
     }
 
-    public String getResourceRef() {
-        return resourceRef;
+    /**
+     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
+     */
+    public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
+        setAggregationStrategyMethodName(aggregationStrategyMethodName);
+        return this;
+    }
+
+    /**
+     * If this option is false then the aggregate method is not used if there was no data to enrich.
+     * If this option is true then null values is used as the oldExchange (when no data to enrich),
+     * when using POJOs as the AggregationStrategy.
+     */
+    public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
+        setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+        return this;
     }
 
     /**
-     * Refers to the endpoint for the external service to poll enrich from. You must use either uri or ref.
-     *
-     * @deprecated use uri with ref:uri instead
+     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
+     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
+     * to do if there was an exception in the aggregate method. For example to suppress the exception
+     * or set a custom message body etc.
      */
-    @Deprecated
-    public void setResourceRef(String resourceRef) {
-        this.resourceRef = resourceRef;
+    public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) {
+        setAggregateOnException(aggregateOnException);
+        return this;
     }
 
+    // Properties
+    // -------------------------------------------------------------------------
+
     public Long getTimeout() {
         return timeout;
     }
 
-    /**
-     * Timeout in millis when polling from the external service.
-     * <p/>
-     * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
-     * <ul>
-     *     <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
-     *     <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
-     *     <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
-     * </ul>
-     * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
-     */
     public void setTimeout(Long timeout) {
         this.timeout = timeout;
     }
@@ -229,10 +201,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategyRef;
     }
 
-    /**
-     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as outgoing message.
-     */
     public void setAggregationStrategyRef(String aggregationStrategyRef) {
         this.aggregationStrategyRef = aggregationStrategyRef;
     }
@@ -241,9 +209,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategyMethodName;
     }
 
-    /**
-     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
         this.aggregationStrategyMethodName = aggregationStrategyMethodName;
     }
@@ -252,11 +217,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategyMethodAllowNull;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there was no data to enrich.
-     * If this option is true then null values is used as the oldExchange (when no data to enrich),
-     * when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
         this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
     }
@@ -265,10 +225,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregationStrategy;
     }
 
-    /**
-     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as outgoing message.
-     */
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -277,26 +233,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return aggregateOnException;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
-     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
-     * to do if there was an exception in the aggregate method. For example to suppress the exception
-     * or set a custom message body etc.
-     */
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
 
-    public ExpressionDefinition getExpression() {
-        return expression;
-    }
-
-    /**
-     * Sets an expression to use for dynamic computing the endpoint to poll from.
-     * <p/>
-     * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use.
-     */
-    public void setExpression(ExpressionDefinition expression) {
-        this.expression = expression;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b525c0f..589df86 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -52,6 +52,7 @@ import org.apache.camel.builder.ProcessorBuilder;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
+import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
@@ -3292,8 +3293,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri) {
-        addOutput(new PollEnrichDefinition(null, resourceUri, -1));
-        return (Type) this;
+        return pollEnrich(resourceUri, null);
     }
 
     /**
@@ -3314,8 +3314,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
-        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1));
-        return (Type) this;
+        return pollEnrich(resourceUri, -1, aggregationStrategy);
     }
 
     /**
@@ -3338,8 +3337,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
-        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
-        return (Type) this;
+        return pollEnrich(resourceUri, timeout, aggregationStrategy, false);
     }
 
     /**
@@ -3364,7 +3362,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
-        PollEnrichDefinition pollEnrich = new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout);
+        PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+        pollEnrich.setExpression(new ConstantExpression(resourceUri));
+        pollEnrich.setTimeout(timeout);
+        pollEnrich.setAggregationStrategy(aggregationStrategy);
         pollEnrich.setAggregateOnException(aggregateOnException);
         addOutput(pollEnrich);
         return (Type) this;
@@ -3389,8 +3390,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout) {
-        addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
-        return (Type) this;
+        return pollEnrich(resourceUri, timeout, null);
     }
 
     /**
@@ -3414,7 +3414,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     @SuppressWarnings("unchecked")
     public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef) {
         PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
-        pollEnrich.setResourceRef(resourceRef);
+        pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
         pollEnrich.setTimeout(timeout);
         pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
         addOutput(pollEnrich);
@@ -3444,7 +3444,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     @SuppressWarnings("unchecked")
     public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef, boolean aggregateOnException) {
         PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
-        pollEnrich.setResourceRef(resourceRef);
+        pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
         pollEnrich.setTimeout(timeout);
         pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
         pollEnrich.setAggregateOnException(aggregateOnException);
@@ -3484,6 +3484,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obtain the additional data, where as enrich uses a producer.
+     * <p/>
+     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+     * otherwise we use <tt>receive(timeout)</tt>.
+     *
+     * @return a expression builder clause to set the expression to use for computing the endpoint to poll from
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    public ExpressionClause<PollEnrichDefinition> pollEnrich() {
+        PollEnrichDefinition answer = new PollEnrichDefinition();
+        addOutput(answer);
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
      * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
      * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
      * The hook invoke callbacks for either onComplete or onFailure.

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index ab313fb..9873cbb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -22,7 +22,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
@@ -50,55 +49,25 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
  *
  * @see Enricher
  */
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
     private CamelContext camelContext;
     private ConsumerCache consumerCache;
     private String id;
     private AggregationStrategy aggregationStrategy;
-    private final PollingConsumer consumer;
     private final Expression expression;
     private long timeout;
     private boolean aggregateOnException;
 
     /**
-     * Creates a new {@link PollEnricher}. The default aggregation strategy is to
-     * copy the additional data obtained from the enricher's resource over the
-     * input data. When using the copy aggregation strategy the enricher
-     * degenerates to a normal transformer.
-     *
-     * @param consumer consumer to resource endpoint.
-     */
-    public PollEnricher(PollingConsumer consumer) {
-        this(defaultAggregationStrategy(), consumer, 0);
-    }
-
-    /**
      * Creates a new {@link PollEnricher}.
      *
-     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
-     * @param consumer consumer to resource endpoint.
-     * @param timeout timeout in millis
-     */
-    public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
-        this.aggregationStrategy = aggregationStrategy;
-        this.consumer = consumer;
-        this.expression = null;
-        this.timeout = timeout;
-    }
-
-    /**
-     * Creates a new {@link PollEnricher}.
-     *
-     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
      * @param expression expression to use to compute the endpoint to poll from.
      * @param timeout timeout in millis
      */
-    public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) {
-        this.aggregationStrategy = aggregationStrategy;
+    public PollEnricher(Expression expression, long timeout) {
         this.expression = expression;
-        this.consumer = null;
         this.timeout = timeout;
     }
 
@@ -118,10 +87,6 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
         this.id = id;
     }
 
-    public Endpoint getEndpoint() {
-        return consumer != null ? consumer.getEndpoint() : null;
-    }
-
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }
@@ -193,34 +158,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
         }
 
         // which consumer to use
-        PollingConsumer target = consumer;
-        Endpoint endpoint = null;
+        PollingConsumer consumer;
+        Endpoint endpoint;
 
         // use dynamic endpoint so calculate the endpoint to use
-        if (expression != null) {
-            try {
-                Object recipient = expression.evaluate(exchange, Object.class);
-                endpoint = resolveEndpoint(exchange, recipient);
-                // acquire the consumer from the cache
-                target = consumerCache.acquirePollingConsumer(endpoint);
-            } catch (Throwable e) {
-                exchange.setException(e);
-                callback.done(true);
-                return true;
-            }
+        try {
+            Object recipient = expression.evaluate(exchange, Object.class);
+            endpoint = resolveEndpoint(exchange, recipient);
+            // acquire the consumer from the cache
+            consumer = consumerCache.acquirePollingConsumer(endpoint);
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
         Exchange resourceExchange;
         try {
             if (timeout < 0) {
-                LOG.debug("Consumer receive: {}", target);
+                LOG.debug("Consumer receive: {}", consumer);
                 resourceExchange = consumer.receive();
             } else if (timeout == 0) {
-                LOG.debug("Consumer receiveNoWait: {}", target);
-                resourceExchange = target.receiveNoWait();
+                LOG.debug("Consumer receiveNoWait: {}", consumer);
+                resourceExchange = consumer.receiveNoWait();
             } else {
-                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
-                resourceExchange = target.receive(timeout);
+                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
+                resourceExchange = consumer.receive(timeout);
             }
 
             if (resourceExchange == null) {
@@ -234,9 +197,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
             return true;
         } finally {
             // return the consumer back to the cache
-            if (expression != null) {
-                consumerCache.releasePollingConsumer(endpoint, target);
-            }
+            consumerCache.releasePollingConsumer(endpoint, consumer);
         }
 
         try {
@@ -262,9 +223,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
 
             // set header with the uri of the endpoint enriched so we can use that for tracing etc
             if (exchange.hasOut()) {
-                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
             } else {
-                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
             }
         } catch (Throwable e) {
             exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
@@ -308,23 +269,23 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
 
     @Override
     public String toString() {
-        return "PollEnrich[" + consumer + "]";
+        return "PollEnrich[" + expression + "]";
     }
 
     protected void doStart() throws Exception {
-        if (expression != null && consumerCache == null) {
+        if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for computing the endpoints to poll
             consumerCache = new ConsumerCache(this, getCamelContext());
         }
-        ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
+        ServiceHelper.startServices(consumerCache, aggregationStrategy);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+        ServiceHelper.stopServices(aggregationStrategy, consumerCache);
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
+        ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache);
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 4e983a7..38a42ab 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -39,7 +39,7 @@ public class PollEnrichExpressionTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .pollEnrich(header("source"), 1000, null, false)
+                    .pollEnrich().header("source")
                     .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
new file mode 100644
index 0000000..8017e8d
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.enricher.PollEnrichExpressionTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringPollEnrichExpressionTest extends PollEnrichExpressionTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnrichExpressionTest.xml");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
new file mode 100644
index 0000000..cb932e3
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:start"/>
+      <pollEnrich>
+        <header>source</header>
+      </pollEnrich>
+      <to uri="mock:result"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+  </camelContext>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
index f63b63a..f0f71ce 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
@@ -22,34 +22,42 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-    <camelContext xmlns="http://camel.apache.org/schema/spring">
-        <!-- START SNIPPET: e1 -->
-        <route>
-            <from uri="direct:enricher-test-1"/>
-            <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-        <!-- END SNIPPET: e1 -->
-
-        <route>
-            <from uri="direct:enricher-test-2"/>
-            <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-3"/>
-            <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-4"/>
-            <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-    </camelContext>
-
-    <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:enricher-test-1"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <constant>seda:foo1</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+
+    <route>
+      <from uri="direct:enricher-test-2"/>
+      <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+        <constant>seda:foo2</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-3"/>
+      <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+        <constant>seda:foo3</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-4"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <constant>seda:foo4</constant>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+  </camelContext>
+
+  <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
 
 </beans>

http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index 6046e7d..c0e0c0b 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -22,40 +22,48 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-    <camelContext xmlns="http://camel.apache.org/schema/spring">
-
-        <endpoint id="foo1" uri="seda:foo1"/>
-        <endpoint id="foo2" uri="seda:foo2"/>
-        <endpoint id="foo3" uri="seda:foo3"/>
-        <endpoint id="foo4" uri="seda:foo4"/>
-
-        <!-- START SNIPPET: e1 -->
-        <route>
-            <from uri="direct:enricher-test-1"/>
-            <pollEnrich ref="foo1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-        <!-- END SNIPPET: e1 -->
-
-        <route>
-            <from uri="direct:enricher-test-2"/>
-            <pollEnrich ref="foo2" timeout="1000" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-3"/>
-            <pollEnrich ref="foo3" timeout="-1" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-
-        <route>
-            <from uri="direct:enricher-test-4"/>
-            <pollEnrich ref="foo4" strategyRef="sampleAggregator"/>
-            <to uri="mock:mock"/>
-        </route>
-    </camelContext>
-
-    <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+    <endpoint id="foo1" uri="seda:foo1"/>
+    <endpoint id="foo2" uri="seda:foo2"/>
+    <endpoint id="foo3" uri="seda:foo3"/>
+    <endpoint id="foo4" uri="seda:foo4"/>
+
+    <!-- START SNIPPET: e1 -->
+    <route>
+      <from uri="direct:enricher-test-1"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <ref>foo1</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+    <!-- END SNIPPET: e1 -->
+
+    <route>
+      <from uri="direct:enricher-test-2"/>
+      <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+        <ref>foo2</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-3"/>
+      <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+        <ref>foo3</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+
+    <route>
+      <from uri="direct:enricher-test-4"/>
+      <pollEnrich strategyRef="sampleAggregator">
+        <ref>foo4</ref>
+      </pollEnrich>
+      <to uri="mock:mock"/>
+    </route>
+  </camelContext>
+
+  <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
 
 </beans>