You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/13 14:37:53 UTC
[02/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic
uris.
CAMEL-4596: pollEnrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5be4d6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5be4d6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5be4d6e
Branch: refs/heads/master
Commit: b5be4d6ed829700ca9af852940dcf28f3c00b598
Parents: 9fd4d54
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 09:34:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 10:06:13 2015 +0200
----------------------------------------------------------------------
.../camel/model/PollEnrichDefinition.java | 184 ++++++-------------
.../apache/camel/model/ProcessorDefinition.java | 43 +++--
.../apache/camel/processor/PollEnricher.java | 91 +++------
.../enricher/PollEnrichExpressionTest.java | 2 +-
.../SpringPollEnrichExpressionTest.java | 30 +++
.../processor/pollEnrichExpressionTest.xml | 37 ++++
.../camel/spring/processor/pollEnricher.xml | 66 ++++---
.../camel/spring/processor/pollEnricherRef.xml | 78 ++++----
8 files changed, 267 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index a5d7cb1..eb1247b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,22 +19,17 @@ package org.apache.camel.model;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
import org.apache.camel.Expression;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
/**
* Enriches messages with data polled from a secondary resource
@@ -44,15 +39,7 @@ import org.apache.camel.util.ObjectHelper;
@Metadata(label = "eip,transformation")
@XmlRootElement(name = "pollEnrich")
@XmlAccessorType(XmlAccessType.FIELD)
-public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition {
- @XmlElementRef
- private ExpressionDefinition expression;
- @XmlAttribute(name = "uri")
- private String resourceUri;
- // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
- @XmlAttribute(name = "ref")
- @Deprecated
- private String resourceRef;
+public class PollEnrichDefinition extends NoOutputExpressionNode {
@XmlAttribute @Metadata(defaultValue = "-1")
private Long timeout;
@XmlAttribute(name = "strategyRef")
@@ -69,65 +56,29 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
public PollEnrichDefinition() {
}
- public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) {
+ public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) {
this.aggregationStrategy = aggregationStrategy;
- this.resourceUri = resourceUri;
this.timeout = timeout;
}
@Override
public String toString() {
- return "PollEnrich[" + description() + " " + aggregationStrategy + "]";
+ return "PollEnrich[" + getExpression() + "]";
}
- protected String description() {
- return FromDefinition.description(getResourceUri(), getResourceRef(), (Endpoint) null);
- }
-
@Override
public String getLabel() {
- return "pollEnrich[" + description() + "]";
- }
-
- @Override
- public String getEndpointUri() {
- if (resourceUri != null) {
- return resourceUri;
- } else {
- return null;
- }
+ return "pollEnrich[" + getExpression() + "]";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) {
- throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
- }
-
- // lookup endpoint
- PollingConsumer consumer = null;
- if (resourceUri != null) {
- Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
- consumer = endpoint.createPollingConsumer();
- } else if (resourceRef != null) {
- Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
- consumer = endpoint.createPollingConsumer();
- }
// if no timeout then we should block, and there use a negative timeout
long time = timeout != null ? timeout : -1;
+ Expression exp = getExpression().createExpression(routeContext);
- // create the expression if any was configured
- Expression exp = createResourceExpression(routeContext);
-
- PollEnricher enricher;
- if (exp != null) {
- enricher = new PollEnricher(null, exp, time);
- } else if (consumer != null) {
- enricher = new PollEnricher(null, consumer, time);
- } else {
- throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
- }
+ PollEnricher enricher = new PollEnricher(exp, time);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
if (strategy == null) {
@@ -167,60 +118,81 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return strategy;
}
+ // Fluent API
+ // -------------------------------------------------------------------------
+
+ // TODO: add cacheSize option
+
/**
- * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from.
- *
- * @param routeContext the route context
- * @return the created expression, or <tt>null</tt> if no expression configured
+ * Timeout in millis when polling from the external service.
+ * <p/>
+ * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
+ * <ul>
+ * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
+ * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
+ * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
+ * </ul>
+ * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
*/
- protected Expression createResourceExpression(RouteContext routeContext) {
- if (expression != null) {
- return expression.createExpression(routeContext);
- } else {
- return null;
- }
+ public PollEnrichDefinition timeout(long timeout) {
+ setTimeout(timeout);
+ return this;
}
- public String getResourceUri() {
- return resourceUri;
+ /**
+ * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
+ */
+ public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy);
+ return this;
}
/**
- * The endpoint uri for the external service to poll enrich from. You must use either uri or ref.
+ * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
*/
- public void setResourceUri(String resourceUri) {
- this.resourceUri = resourceUri;
+ public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+ setAggregationStrategyRef(aggregationStrategyRef);
+ return this;
}
- public String getResourceRef() {
- return resourceRef;
+ /**
+ * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
+ */
+ public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
+ setAggregationStrategyMethodName(aggregationStrategyMethodName);
+ return this;
+ }
+
+ /**
+ * If this option is false then the aggregate method is not used if there was no data to enrich.
+ * If this option is true then null values is used as the oldExchange (when no data to enrich),
+ * when using POJOs as the AggregationStrategy.
+ */
+ public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
+ setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+ return this;
}
/**
- * Refers to the endpoint for the external service to poll enrich from. You must use either uri or ref.
- *
- * @deprecated use uri with ref:uri instead
+ * If this option is false then the aggregate method is not used if there was an exception thrown while trying
+ * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
+ * to do if there was an exception in the aggregate method. For example to suppress the exception
+ * or set a custom message body etc.
*/
- @Deprecated
- public void setResourceRef(String resourceRef) {
- this.resourceRef = resourceRef;
+ public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) {
+ setAggregateOnException(aggregateOnException);
+ return this;
}
+ // Properties
+ // -------------------------------------------------------------------------
+
public Long getTimeout() {
return timeout;
}
- /**
- * Timeout in millis when polling from the external service.
- * <p/>
- * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
- * <ul>
- * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
- * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
- * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
- * </ul>
- * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
- */
public void setTimeout(Long timeout) {
this.timeout = timeout;
}
@@ -229,10 +201,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategyRef;
}
- /**
- * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
- * By default Camel will use the reply from the external service as outgoing message.
- */
public void setAggregationStrategyRef(String aggregationStrategyRef) {
this.aggregationStrategyRef = aggregationStrategyRef;
}
@@ -241,9 +209,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategyMethodName;
}
- /**
- * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
- */
public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
this.aggregationStrategyMethodName = aggregationStrategyMethodName;
}
@@ -252,11 +217,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategyMethodAllowNull;
}
- /**
- * If this option is false then the aggregate method is not used if there was no data to enrich.
- * If this option is true then null values is used as the oldExchange (when no data to enrich),
- * when using POJOs as the AggregationStrategy.
- */
public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
}
@@ -265,10 +225,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategy;
}
- /**
- * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
- * By default Camel will use the reply from the external service as outgoing message.
- */
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
@@ -277,26 +233,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregateOnException;
}
- /**
- * If this option is false then the aggregate method is not used if there was an exception thrown while trying
- * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
- * to do if there was an exception in the aggregate method. For example to suppress the exception
- * or set a custom message body etc.
- */
public void setAggregateOnException(Boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
- public ExpressionDefinition getExpression() {
- return expression;
- }
-
- /**
- * Sets an expression to use for dynamic computing the endpoint to poll from.
- * <p/>
- * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use.
- */
- public void setExpression(ExpressionDefinition expression) {
- this.expression = expression;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b525c0f..589df86 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -52,6 +52,7 @@ import org.apache.camel.builder.ProcessorBuilder;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.language.LanguageExpression;
+import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.model.rest.RestDefinition;
import org.apache.camel.processor.InterceptEndpointProcessor;
import org.apache.camel.processor.Pipeline;
@@ -3292,8 +3293,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri) {
- addOutput(new PollEnrichDefinition(null, resourceUri, -1));
- return (Type) this;
+ return pollEnrich(resourceUri, null);
}
/**
@@ -3314,8 +3314,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
- addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1));
- return (Type) this;
+ return pollEnrich(resourceUri, -1, aggregationStrategy);
}
/**
@@ -3338,8 +3337,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
- addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
- return (Type) this;
+ return pollEnrich(resourceUri, timeout, aggregationStrategy, false);
}
/**
@@ -3364,7 +3362,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
- PollEnrichDefinition pollEnrich = new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout);
+ PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+ pollEnrich.setExpression(new ConstantExpression(resourceUri));
+ pollEnrich.setTimeout(timeout);
+ pollEnrich.setAggregationStrategy(aggregationStrategy);
pollEnrich.setAggregateOnException(aggregateOnException);
addOutput(pollEnrich);
return (Type) this;
@@ -3389,8 +3390,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout) {
- addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
- return (Type) this;
+ return pollEnrich(resourceUri, timeout, null);
}
/**
@@ -3414,7 +3414,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
@SuppressWarnings("unchecked")
public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef) {
PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
- pollEnrich.setResourceRef(resourceRef);
+ pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
pollEnrich.setTimeout(timeout);
pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
addOutput(pollEnrich);
@@ -3444,7 +3444,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
@SuppressWarnings("unchecked")
public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef, boolean aggregateOnException) {
PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
- pollEnrich.setResourceRef(resourceRef);
+ pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
pollEnrich.setTimeout(timeout);
pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
pollEnrich.setAggregateOnException(aggregateOnException);
@@ -3484,6 +3484,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
}
/**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obtain the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @return a expression builder clause to set the expression to use for computing the endpoint to poll from
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ public ExpressionClause<PollEnrichDefinition> pollEnrich() {
+ PollEnrichDefinition answer = new PollEnrichDefinition();
+ addOutput(answer);
+ return ExpressionClause.createAndSetExpression(answer);
+ }
+
+ /**
* Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
* a callback when the {@link org.apache.camel.Exchange} has finished being processed.
* The hook invoke callbacks for either onComplete or onFailure.
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index ab313fb..9873cbb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -22,7 +22,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.PollingConsumer;
@@ -50,55 +49,25 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
*
* @see Enricher
*/
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
private CamelContext camelContext;
private ConsumerCache consumerCache;
private String id;
private AggregationStrategy aggregationStrategy;
- private final PollingConsumer consumer;
private final Expression expression;
private long timeout;
private boolean aggregateOnException;
/**
- * Creates a new {@link PollEnricher}. The default aggregation strategy is to
- * copy the additional data obtained from the enricher's resource over the
- * input data. When using the copy aggregation strategy the enricher
- * degenerates to a normal transformer.
- *
- * @param consumer consumer to resource endpoint.
- */
- public PollEnricher(PollingConsumer consumer) {
- this(defaultAggregationStrategy(), consumer, 0);
- }
-
- /**
* Creates a new {@link PollEnricher}.
*
- * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
- * @param consumer consumer to resource endpoint.
- * @param timeout timeout in millis
- */
- public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
- this.aggregationStrategy = aggregationStrategy;
- this.consumer = consumer;
- this.expression = null;
- this.timeout = timeout;
- }
-
- /**
- * Creates a new {@link PollEnricher}.
- *
- * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @param expression expression to use to compute the endpoint to poll from.
* @param timeout timeout in millis
*/
- public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) {
- this.aggregationStrategy = aggregationStrategy;
+ public PollEnricher(Expression expression, long timeout) {
this.expression = expression;
- this.consumer = null;
this.timeout = timeout;
}
@@ -118,10 +87,6 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
this.id = id;
}
- public Endpoint getEndpoint() {
- return consumer != null ? consumer.getEndpoint() : null;
- }
-
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
@@ -193,34 +158,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
}
// which consumer to use
- PollingConsumer target = consumer;
- Endpoint endpoint = null;
+ PollingConsumer consumer;
+ Endpoint endpoint;
// use dynamic endpoint so calculate the endpoint to use
- if (expression != null) {
- try {
- Object recipient = expression.evaluate(exchange, Object.class);
- endpoint = resolveEndpoint(exchange, recipient);
- // acquire the consumer from the cache
- target = consumerCache.acquirePollingConsumer(endpoint);
- } catch (Throwable e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
+ try {
+ Object recipient = expression.evaluate(exchange, Object.class);
+ endpoint = resolveEndpoint(exchange, recipient);
+ // acquire the consumer from the cache
+ consumer = consumerCache.acquirePollingConsumer(endpoint);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
Exchange resourceExchange;
try {
if (timeout < 0) {
- LOG.debug("Consumer receive: {}", target);
+ LOG.debug("Consumer receive: {}", consumer);
resourceExchange = consumer.receive();
} else if (timeout == 0) {
- LOG.debug("Consumer receiveNoWait: {}", target);
- resourceExchange = target.receiveNoWait();
+ LOG.debug("Consumer receiveNoWait: {}", consumer);
+ resourceExchange = consumer.receiveNoWait();
} else {
- LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
- resourceExchange = target.receive(timeout);
+ LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
+ resourceExchange = consumer.receive(timeout);
}
if (resourceExchange == null) {
@@ -234,9 +197,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
return true;
} finally {
// return the consumer back to the cache
- if (expression != null) {
- consumerCache.releasePollingConsumer(endpoint, target);
- }
+ consumerCache.releasePollingConsumer(endpoint, consumer);
}
try {
@@ -262,9 +223,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
// set header with the uri of the endpoint enriched so we can use that for tracing etc
if (exchange.hasOut()) {
- exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+ exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
} else {
- exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+ exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
}
} catch (Throwable e) {
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
@@ -308,23 +269,23 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
@Override
public String toString() {
- return "PollEnrich[" + consumer + "]";
+ return "PollEnrich[" + expression + "]";
}
protected void doStart() throws Exception {
- if (expression != null && consumerCache == null) {
+ if (consumerCache == null) {
// create consumer cache if we use dynamic expressions for computing the endpoints to poll
consumerCache = new ConsumerCache(this, getCamelContext());
}
- ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
+ ServiceHelper.startServices(consumerCache, aggregationStrategy);
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+ ServiceHelper.stopServices(aggregationStrategy, consumerCache);
}
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
+ ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 4e983a7..38a42ab 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -39,7 +39,7 @@ public class PollEnrichExpressionTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .pollEnrich(header("source"), 1000, null, false)
+ .pollEnrich().header("source")
.to("mock:result");
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
new file mode 100644
index 0000000..8017e8d
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.enricher.PollEnrichExpressionTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringPollEnrichExpressionTest extends PollEnrichExpressionTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnrichExpressionTest.xml");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
new file mode 100644
index 0000000..cb932e3
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:start"/>
+ <pollEnrich>
+ <header>source</header>
+ </pollEnrich>
+ <to uri="mock:result"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+ </camelContext>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
index f63b63a..f0f71ce 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
@@ -22,34 +22,42 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <camelContext xmlns="http://camel.apache.org/schema/spring">
- <!-- START SNIPPET: e1 -->
- <route>
- <from uri="direct:enricher-test-1"/>
- <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- <!-- END SNIPPET: e1 -->
-
- <route>
- <from uri="direct:enricher-test-2"/>
- <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-3"/>
- <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-4"/>
- <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- </camelContext>
-
- <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:enricher-test-1"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <constant>seda:foo1</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+
+ <route>
+ <from uri="direct:enricher-test-2"/>
+ <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+ <constant>seda:foo2</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-3"/>
+ <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+ <constant>seda:foo3</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-4"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <constant>seda:foo4</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ </camelContext>
+
+ <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index 6046e7d..c0e0c0b 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -22,40 +22,48 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <camelContext xmlns="http://camel.apache.org/schema/spring">
-
- <endpoint id="foo1" uri="seda:foo1"/>
- <endpoint id="foo2" uri="seda:foo2"/>
- <endpoint id="foo3" uri="seda:foo3"/>
- <endpoint id="foo4" uri="seda:foo4"/>
-
- <!-- START SNIPPET: e1 -->
- <route>
- <from uri="direct:enricher-test-1"/>
- <pollEnrich ref="foo1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- <!-- END SNIPPET: e1 -->
-
- <route>
- <from uri="direct:enricher-test-2"/>
- <pollEnrich ref="foo2" timeout="1000" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-3"/>
- <pollEnrich ref="foo3" timeout="-1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-4"/>
- <pollEnrich ref="foo4" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- </camelContext>
-
- <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+ <endpoint id="foo1" uri="seda:foo1"/>
+ <endpoint id="foo2" uri="seda:foo2"/>
+ <endpoint id="foo3" uri="seda:foo3"/>
+ <endpoint id="foo4" uri="seda:foo4"/>
+
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:enricher-test-1"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <ref>foo1</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+
+ <route>
+ <from uri="direct:enricher-test-2"/>
+ <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+ <ref>foo2</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-3"/>
+ <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+ <ref>foo3</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-4"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <ref>foo4</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ </camelContext>
+
+ <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
</beans>