You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/29 13:43:38 UTC
svn commit: r789294 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/resources/org/apache/camel/model/
camel-core/src/test/java/org/apache/camel/processor/enric...
Author: davsclaus
Date: Mon Jun 29 11:43:38 2009
New Revision: 789294
URL: http://svn.apache.org/viewvc?rev=789294&view=rev
Log:
CAMEL-1694: added pollEnrich to DSL to enrich using a polling consumer. For instance to poll a file or download a FTP file.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
- copied, changed from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
- copied, changed from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
- copied, changed from r789205, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java
- copied, changed from r789205, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
- copied, changed from r789205, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java (from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java Mon Jun 29 11:43:38 2009
@@ -24,55 +24,62 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
-import org.apache.camel.processor.Enricher;
+import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.RouteContext;
/**
- * Represents an XML <enrich/> element
+ * Represents an XML <pollEnrich/> element
*
- * @see Enricher
+ * @see org.apache.camel.processor.Enricher
*/
-@XmlRootElement(name = "enrich")
+@XmlRootElement(name = "pollEnrich")
@XmlAccessorType(XmlAccessType.FIELD)
-public class EnrichDefinition extends OutputDefinition<EnrichDefinition> {
+public class PollEnrichDefinition extends OutputDefinition<PollEnrichDefinition> {
@XmlAttribute(name = "uri", required = true)
private String resourceUri;
-
- @XmlAttribute(name = "strategyRef", required = false)
+
+ @XmlAttribute(name = "timeout")
+ private Long timeout;
+
+ @XmlAttribute(name = "strategyRef")
private String aggregationStrategyRef;
-
+
@XmlTransient
private AggregationStrategy aggregationStrategy;
-
- public EnrichDefinition() {
- this(null, null);
- }
-
- public EnrichDefinition(String resourceUri) {
- this(null, resourceUri);
+
+ public PollEnrichDefinition() {
+ this(null, null, 0);
}
-
- public EnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri) {
+
+ public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) {
this.aggregationStrategy = aggregationStrategy;
this.resourceUri = resourceUri;
+ this.timeout = timeout;
}
-
+
@Override
public String toString() {
- return "Enrich[" + resourceUri + " " + aggregationStrategy + "]";
+ return "PollEnrich[" + resourceUri + " " + aggregationStrategy + "]";
}
@Override
public String getShortName() {
- return "enrich";
+ return "pollEnrich";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
- Enricher enricher = new Enricher(null, endpoint.createProducer());
+
+ PollEnricher enricher;
+ if (timeout != null) {
+ enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout);
+ } else {
+ enricher = new PollEnricher(null, endpoint.createPollingConsumer(), 0);
+ }
+
if (aggregationStrategyRef != null) {
aggregationStrategy = routeContext.lookup(aggregationStrategyRef, AggregationStrategy.class);
}
@@ -81,7 +88,8 @@
} else {
enricher.setAggregationStrategy(aggregationStrategy);
}
+
return enricher;
}
-
-}
+
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Jun 29 11:43:38 2009
@@ -1806,8 +1806,8 @@
}
/**
- * Enriches an exchange with additional data obtained from a
- * <code>resourceUri</code>.
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
*
* @param resourceUri URI of resource endpoint for obtaining additional data.
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
@@ -1821,8 +1821,11 @@
}
/**
- * Enriches an exchange with additional data obtained from a
- * <code>resourceUri</code>.
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+ * <p/>
+ * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
+ * to obatin the additional data, where as pollEnrich uses a polling consumer.
*
* @param resourceUri URI of resource endpoint for obtaining additional data.
* @return the builder
@@ -1835,6 +1838,96 @@
}
/**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * This method will block until data is avialable, use the method with timeout if you do not
+ * want to risk waiting a long time before data is available from the resourceUri.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri) {
+ addOutput(new PollEnrichDefinition(null, resourceUri, 0));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * This method will block until data is avialable, use the method with timeout if you do not
+ * want to risk waiting a long time before data is available from the resourceUri.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
+ addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 0));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @param timeout timeout in millis to wait at most for data to be available.
+ * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
+ addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @param timeout timeout in millis to wait at most for data to be available.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, long timeout) {
+ addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
+ return (Type) this;
+ }
+
+ /**
* Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
* a callback when the {@link org.apache.camel.Exchange} has finished being processed.
* The hook invoke callbacks for either onComplete or onFailure.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Mon Jun 29 11:43:38 2009
@@ -35,6 +35,11 @@
* and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {@link AggregationStrategy}
* object.
+ * <p/>
+ * Uses a {@link org.apache.camel.Producer} to obatin the additional data as opposed to {@link PollEnricher}
+ * that uses a {@link org.apache.camel.PollingConsumer}.
+ *
+ * @see PollEnricher
*/
public class Enricher extends ServiceSupport implements Processor {
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Mon Jun 29 11:43:38 2009
@@ -18,55 +18,61 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional
* data from a <i>resource</i> represented by an endpoint <code>producer</code>
* and second by aggregating input data and additional data. Aggregation of
- * input data and additional data is delegated to an {@link AggregationStrategy}
+ * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
* object.
+ * <p/>
+ * Uses a {@link org.apache.camel.PollingConsumer} to obatin the additional data as opposed to {@link Enricher}
+ * that uses a {@link org.apache.camel.Producer}.
+ *
+ * @see Enricher
*/
-public class Enricher extends ServiceSupport implements Processor {
+public class PollEnricher extends ServiceSupport implements Processor {
- private static final transient Log LOG = LogFactory.getLog(Enricher.class);
+ private static final transient Log LOG = LogFactory.getLog(PollEnricher.class);
private AggregationStrategy aggregationStrategy;
- private Producer producer;
+ private PollingConsumer consumer;
+ private long timeout;
/**
- * Creates a new {@link Enricher}. The default aggregation strategy is to
+ * Creates a new {@link PollEnricher}. The default aggregation strategy is to
* copy the additional data obtained from the enricher's resource over the
* input data. When using the copy aggregation strategy the enricher
* degenerates to a normal transformer.
- *
- * @param producer producer to resource endpoint.
+ *
+ * @param consumer consumer to resource endpoint.
*/
- public Enricher(Producer producer) {
- this(defaultAggregationStrategy(), producer);
+ public PollEnricher(PollingConsumer consumer) {
+ this(defaultAggregationStrategy(), consumer, 0);
}
/**
- * Creates a new {@link Enricher}.
- *
+ * Creates a new {@link PollEnricher}.
+ *
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
- * @param producer producer to resource endpoint.
+ * @param consumer consumer to resource endpoint.
*/
- public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
+ public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
this.aggregationStrategy = aggregationStrategy;
- this.producer = producer;
+ this.consumer = consumer;
+ this.timeout = timeout;
}
/**
- * Sets the aggregation strategy for this enricher.
+ * Sets the aggregation strategy for this poll enricher.
*
* @param aggregationStrategy the aggregationStrategy to set
*/
@@ -75,29 +81,55 @@
}
/**
- * Sets the default aggregation strategy for this enricher.
+ * Sets the default aggregation strategy for this poll enricher.
*/
public void setDefaultAggregationStrategy() {
this.aggregationStrategy = defaultAggregationStrategy();
}
/**
+ * Sets the timeout to use when polling.
+ * <p/>
+ * Use 0 or negative to not use timeout and block until data is available.
+ *
+ * @param timeout timeout in millis.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
* <code>producer</code> and second by aggregating input data and additional
* data. Aggregation of input data and additional data is delegated to an
- * {@link AggregationStrategy} object set at construction time. If the
+ * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
* message exchange with the resource endpoint fails then no aggregation
* will be done and the failed exchange content is copied over to the
* original message exchange.
- *
+ *
* @param exchange input data.
*/
public void process(Exchange exchange) throws Exception {
- Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
- producer.process(resourceExchange);
+ Exchange resourceExchange;
+ if (timeout < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer receive: " + consumer);
+ }
+ resourceExchange = consumer.receive();
+ } else if (timeout == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer receiveNoWait: " + consumer);
+ }
+ resourceExchange = consumer.receiveNoWait();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer receive with timeout: " + timeout + " ms. " + consumer);
+ }
+ resourceExchange = consumer.receive(timeout);
+ }
- if (resourceExchange.isFailed()) {
+ if (resourceExchange != null && resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
@@ -105,7 +137,10 @@
// aggregate original exchange and resource exchange
// but do not aggregate if the resource exchange was filtered
- Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
+ Boolean filtered = null;
+ if (resourceExchange != null) {
+ filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
+ }
if (filtered == null || !filtered) {
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
@@ -121,7 +156,7 @@
}
/**
- * Creates a new {@link DefaultExchange} instance from the given
+ * Creates a new {@link org.apache.camel.impl.DefaultExchange} instance from the given
* <code>exchange</code>. The resulting exchange's pattern is defined by
* <code>pattern</code>.
*
@@ -148,15 +183,15 @@
@Override
public String toString() {
- return "Enrich[" + producer.getEndpoint().getEndpointUri() + "]";
+ return "PollEnrich[" + consumer + "]";
}
protected void doStart() throws Exception {
- producer.start();
+ consumer.start();
}
protected void doStop() throws Exception {
- producer.stop();
+ consumer.stop();
}
private static class CopyAggregationStrategy implements AggregationStrategy {
@@ -168,4 +203,4 @@
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Mon Jun 29 11:43:38 2009
@@ -45,6 +45,7 @@
PackageScanDefinition
PipelineDefinition
PolicyDefinition
+PollEnrichDefinition
ProcessDefinition
RecipientListDefinition
RedeliveryPolicyDefinition
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java (from r789205, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java Mon Jun 29 11:43:38 2009
@@ -23,7 +23,7 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-public class EnricherTest extends ContextTestSupport {
+public class PollEnricherTest extends ContextTestSupport {
private static SampleAggregator aggregationStrategy = new SampleAggregator();
@@ -44,51 +44,66 @@
// InOnly routes
// -------------------------------------------------------------
- public void testEnrichInOnly() throws InterruptedException {
+ public void testPollEnrichInOnly() throws InterruptedException {
+ template.sendBody("seda:foo1", "blah");
+
mock.expectedBodiesReceived("test:blah");
template.sendBody("direct:enricher-test-1", "test");
mock.assertIsSatisfied();
}
- public void testEnrichFaultInOnly() throws InterruptedException {
- mock.expectedMessageCount(0);
- Exchange exchange = template.send("direct:enricher-test-3", new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
- }
- });
+ public void testPollEnrichInOnlyWaitWithTimeout() throws InterruptedException {
+ // this first try there is no data so we timeout
+ mock.expectedBodiesReceived("test:blah");
+ template.sendBody("direct:enricher-test-2", "test");
+ // not expected data so we are not happy
+ mock.assertIsNotSatisfied();
+
+ // now send it and try again
+ mock.reset();
+ template.sendBody("seda:foo2", "blah");
+ template.sendBody("direct:enricher-test-2", "test");
mock.assertIsSatisfied();
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getFault().getBody());
- assertFalse(exchange.hasOut());
- assertNull(exchange.getException());
}
- public void testEnrichErrorInOnly() throws InterruptedException {
- mock.expectedMessageCount(0);
- Exchange exchange = template.send("direct:enricher-test-4", new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
+ public void testPollEnrichInOnlyWaitNoTimeout() throws InterruptedException {
+ // use another thread to send it after 2 seconds
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ template.sendBody("seda:foo3", "blah");
}
});
+
+ long start = System.currentTimeMillis();
+ mock.expectedBodiesReceived("test:blah");
+ t.start();
+ template.sendBody("direct:enricher-test-3", "test");
+ // should take approx 1 sec to complete as the other thread is sending a bit later and we wait
mock.assertIsSatisfied();
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getException().getMessage());
- assertFalse(exchange.hasFault());
- assertFalse(exchange.hasOut());
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take approx 1 sec: was " + delta, delta > 900);
}
// -------------------------------------------------------------
// InOut routes
// -------------------------------------------------------------
- public void testEnrichInOut() throws InterruptedException {
- String result = (String) template.sendBody("direct:enricher-test-5", ExchangePattern.InOut, "test");
+ public void testPollEnrichInOut() throws InterruptedException {
+ template.sendBody("seda:foo4", "blah");
+
+ String result = (String) template.sendBody("direct:enricher-test-4", ExchangePattern.InOut, "test");
assertEquals("test:blah", result);
}
- public void testEnrichInOutPlusHeader() throws InterruptedException {
- Exchange exchange = template.send("direct:enricher-test-5", ExchangePattern.InOut, new Processor() {
+ public void testPollEnrichInOutPlusHeader() throws InterruptedException {
+ template.sendBody("seda:foo4", "blah");
+
+ Exchange exchange = template.send("direct:enricher-test-4", ExchangePattern.InOut, new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setHeader("foo", "bar");
exchange.getIn().setBody("test");
@@ -100,30 +115,6 @@
assertNull(exchange.getException());
}
- public void testEnrichFaultInOut() throws InterruptedException {
- Exchange exchange = template.send("direct:enricher-test-7", ExchangePattern.InOut, new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
- }
- });
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getFault().getBody());
- assertFalse(exchange.hasOut());
- assertNull(exchange.getException());
- }
-
- public void testEnrichErrorInOut() throws InterruptedException {
- Exchange exchange = template.send("direct:enricher-test-8", ExchangePattern.InOut, new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
- }
- });
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getException().getMessage());
- assertFalse(exchange.hasFault());
- assertFalse(exchange.hasOut());
- }
-
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
@@ -132,39 +123,25 @@
// -------------------------------------------------------------
from("direct:enricher-test-1")
- .enrich("direct:enricher-constant-resource", aggregationStrategy)
+ .pollEnrich("seda:foo1", aggregationStrategy)
.to("mock:mock");
- from("direct:enricher-test-3")
- .enrich("direct:enricher-fault-resource", aggregationStrategy)
+ from("direct:enricher-test-2")
+ .pollEnrich("seda:foo2", 1000, aggregationStrategy)
.to("mock:mock");
- from("direct:enricher-test-4").errorHandler(noErrorHandler()) // avoid re-deliveries
- .enrich("direct:enricher-error-resource", aggregationStrategy).to("mock:mock");
+ from("direct:enricher-test-3")
+ .pollEnrich("seda:foo3", -1, aggregationStrategy)
+ .to("mock:mock");
// -------------------------------------------------------------
// InOut routes
// -------------------------------------------------------------
- from("direct:enricher-test-5")
- .enrich("direct:enricher-constant-resource", aggregationStrategy);
-
- from("direct:enricher-test-7")
- .enrich("direct:enricher-fault-resource", aggregationStrategy);
-
- from("direct:enricher-test-8").errorHandler(noErrorHandler()) // avoid re-deliveries
- .enrich("direct:enricher-error-resource", aggregationStrategy);
-
- // -------------------------------------------------------------
- // Enricher resources
- // -------------------------------------------------------------
-
- from("direct:enricher-constant-resource").transform().constant("blah");
-
- from("direct:enricher-fault-resource").errorHandler(noErrorHandler()).process(new FailureProcessor(false));
- from("direct:enricher-error-resource").errorHandler(noErrorHandler()).process(new FailureProcessor(true));
+ from("direct:enricher-test-4")
+ .pollEnrich("seda:foo4", aggregationStrategy);
}
};
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java Mon Jun 29 11:43:38 2009
@@ -22,6 +22,9 @@
public class SampleAggregator implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (newExchange == null) {
+ return oldExchange;
+ }
Object oldBody = oldExchange.getIn().getBody();
Object newBody = newExchange.getIn().getBody();
oldExchange.getIn().setBody(oldBody + ":" + newBody);
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java (from r789205, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java Mon Jun 29 11:43:38 2009
@@ -17,32 +17,13 @@
package org.apache.camel.spring.processor;
import org.apache.camel.CamelContext;
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.component.mock.MockEndpoint;
-
+import org.apache.camel.processor.enricher.PollEnricherTest;
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
-/**
- * @author Martin Krasser
- */
-public class SpringEnricherTest extends ContextTestSupport {
-
- private MockEndpoint mock;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- mock = getMockEndpoint("mock:result");
- }
+public class SpringPollEnricherTest extends PollEnricherTest {
- public void testEnrich() throws Exception {
- mock.expectedBodiesReceived("test:blah");
- template.sendBody("direct:start", "test");
- mock.assertIsSatisfied();
- }
-
protected CamelContext createCamelContext() throws Exception {
- return createSpringCamelContext(this, "org/apache/camel/spring/processor/enricher.xml");
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnricher.xml");
}
-}
+}
\ No newline at end of file
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml (from r789205, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml Mon Jun 29 11:43:38 2009
@@ -22,22 +22,34 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <!-- START SNIPPET: example -->
- <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <enrich uri="direct:resource" strategyRef="sampleAggregator"/>
- <to uri="mock:result"/>
- </route>
- <route>
- <from uri="direct:resource"/>
- <transform>
- <constant>blah</constant>
- </transform>
- </route>
- </camelContext>
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:enricher-test-1"/>
+ <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
- <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator" />
- <!-- END SNIPPET: example -->
+ <route>
+ <from uri="direct:enricher-test-2"/>
+ <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-3"/>
+ <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-4"/>
+ <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+ </camelContext>
+
+ <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
</beans>