You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/29 13:43:38 UTC

svn commit: r789294 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/resources/org/apache/camel/model/ camel-core/src/test/java/org/apache/camel/processor/enric...

Author: davsclaus
Date: Mon Jun 29 11:43:38 2009
New Revision: 789294

URL: http://svn.apache.org/viewvc?rev=789294&view=rev
Log:
CAMEL-1694: added pollEnrich to DSL to enrich using a polling consumer. For instance to poll a file or download a FTP file.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
      - copied, changed from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
      - copied, changed from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
      - copied, changed from r789205, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java
      - copied, changed from r789205, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
      - copied, changed from r789205, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
    camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java (from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java Mon Jun 29 11:43:38 2009
@@ -24,55 +24,62 @@
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.Enricher;
+import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 
 /**
- * Represents an XML <enrich/> element
+ * Represents an XML <pollEnrich/> element
  *
- * @see Enricher
+ * @see org.apache.camel.processor.Enricher
  */
-@XmlRootElement(name = "enrich")
+@XmlRootElement(name = "pollEnrich")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class EnrichDefinition extends OutputDefinition<EnrichDefinition> {
+public class PollEnrichDefinition extends OutputDefinition<PollEnrichDefinition> {
 
     @XmlAttribute(name = "uri", required = true)
     private String resourceUri;
-    
-    @XmlAttribute(name = "strategyRef", required = false)
+
+    @XmlAttribute(name = "timeout")
+    private Long timeout;
+
+    @XmlAttribute(name = "strategyRef")
     private String aggregationStrategyRef;
-    
+
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
-    
-    public EnrichDefinition() {
-        this(null, null);
-    }
-    
-    public EnrichDefinition(String resourceUri) {
-        this(null, resourceUri);
+
+    public PollEnrichDefinition() {
+        this(null, null, 0);
     }
-    
-    public EnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri) {
+
+    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) {
         this.aggregationStrategy = aggregationStrategy;
         this.resourceUri = resourceUri;
+        this.timeout = timeout;
     }
-    
+
     @Override
     public String toString() {
-        return "Enrich[" + resourceUri + " " + aggregationStrategy + "]";
+        return "PollEnrich[" + resourceUri + " " + aggregationStrategy + "]";
     }
 
     @Override
     public String getShortName() {
-        return "enrich";
+        return "pollEnrich";
     }
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
-        Enricher enricher = new Enricher(null, endpoint.createProducer());
+
+        PollEnricher enricher;
+        if (timeout != null) {
+            enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout);
+        } else {
+            enricher = new PollEnricher(null, endpoint.createPollingConsumer(), 0);
+        }
+
         if (aggregationStrategyRef != null) {
             aggregationStrategy = routeContext.lookup(aggregationStrategyRef, AggregationStrategy.class);
         }
@@ -81,7 +88,8 @@
         } else {
             enricher.setAggregationStrategy(aggregationStrategy);
         }
+
         return enricher;
     }
-    
-}
+
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Jun 29 11:43:38 2009
@@ -1806,8 +1806,8 @@
     }
 
     /**
-     * Enriches an exchange with additional data obtained from a
-     * <code>resourceUri</code>.
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
      * 
      * @param resourceUri           URI of resource endpoint for obtaining additional data.
      * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
@@ -1821,8 +1821,11 @@
     }
 
     /**
-     * Enriches an exchange with additional data obtained from a
-     * <code>resourceUri</code>.
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+     * <p/>
+     * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
+     * to obatin the additional data, where as pollEnrich uses a polling consumer.
      *
      * @param resourceUri           URI of resource endpoint for obtaining additional data.
      * @return the builder
@@ -1835,6 +1838,96 @@
     }
 
     /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obatin the additional data, where as enrich uses a producer.
+     * <p/>
+     * This method will block until data is avialable, use the method with timeout if you do not
+     * want to risk waiting a long time before data is available from the resourceUri.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(String resourceUri) {
+        addOutput(new PollEnrichDefinition(null, resourceUri, 0));
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obatin the additional data, where as enrich uses a producer.
+     * <p/>
+     * This method will block until data is avialable, use the method with timeout if you do not
+     * want to risk waiting a long time before data is available from the resourceUri.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
+        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 0));
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obatin the additional data, where as enrich uses a producer.
+     * <p/>
+     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+     * otherwise we use <tt>receive(timeout)</tt>.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @param timeout               timeout in millis to wait at most for data to be available.
+     * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
+        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obatin the additional data, where as enrich uses a producer.
+     * <p/>
+     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+     * otherwise we use <tt>receive(timeout)</tt>.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @param timeout               timeout in millis to wait at most for data to be available.
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(String resourceUri, long timeout) {
+        addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
+        return (Type) this;
+    }
+
+    /**
      * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
      * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
      * The hook invoke callbacks for either onComplete or onFailure.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Mon Jun 29 11:43:38 2009
@@ -35,6 +35,11 @@
  * and second by aggregating input data and additional data. Aggregation of
  * input data and additional data is delegated to an {@link AggregationStrategy}
  * object.
+ * <p/>
+ * Uses a {@link org.apache.camel.Producer} to obatin the additional data as opposed to {@link PollEnricher}
+ * that uses a {@link org.apache.camel.PollingConsumer}.
+ *
+ * @see PollEnricher
  */
 public class Enricher extends ServiceSupport implements Processor {
 

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (from r789205, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Mon Jun 29 11:43:38 2009
@@ -18,55 +18,61 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
 /**
  * A content enricher that enriches input data by first obtaining additional
  * data from a <i>resource</i> represented by an endpoint <code>producer</code>
  * and second by aggregating input data and additional data. Aggregation of
- * input data and additional data is delegated to an {@link AggregationStrategy}
+ * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
  * object.
+ * <p/>
+ * Uses a {@link org.apache.camel.PollingConsumer} to obatin the additional data as opposed to {@link Enricher}
+ * that uses a {@link org.apache.camel.Producer}.
+ *
+ * @see Enricher
  */
-public class Enricher extends ServiceSupport implements Processor {
+public class PollEnricher extends ServiceSupport implements Processor {
 
-    private static final transient Log LOG = LogFactory.getLog(Enricher.class);
+    private static final transient Log LOG = LogFactory.getLog(PollEnricher.class);
     private AggregationStrategy aggregationStrategy;
-    private Producer producer;
+    private PollingConsumer consumer;
+    private long timeout;
 
     /**
-     * Creates a new {@link Enricher}. The default aggregation strategy is to
+     * Creates a new {@link PollEnricher}. The default aggregation strategy is to
      * copy the additional data obtained from the enricher's resource over the
      * input data. When using the copy aggregation strategy the enricher
      * degenerates to a normal transformer.
-     * 
-     * @param producer producer to resource endpoint.
+     *
+     * @param consumer consumer to resource endpoint.
      */
-    public Enricher(Producer producer) {
-        this(defaultAggregationStrategy(), producer);
+    public PollEnricher(PollingConsumer consumer) {
+        this(defaultAggregationStrategy(), consumer, 0);
     }
 
     /**
-     * Creates a new {@link Enricher}.
-     * 
+     * Creates a new {@link PollEnricher}.
+     *
      * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
-     * @param producer producer to resource endpoint.
+     * @param consumer consumer to resource endpoint.
      */
-    public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
+    public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
         this.aggregationStrategy = aggregationStrategy;
-        this.producer = producer;
+        this.consumer = consumer;
+        this.timeout = timeout;
     }
 
     /**
-     * Sets the aggregation strategy for this enricher.
+     * Sets the aggregation strategy for this poll enricher.
      *
      * @param aggregationStrategy the aggregationStrategy to set
      */
@@ -75,29 +81,55 @@
     }
 
     /**
-     * Sets the default aggregation strategy for this enricher.
+     * Sets the default aggregation strategy for this poll enricher.
      */
     public void setDefaultAggregationStrategy() {
         this.aggregationStrategy = defaultAggregationStrategy();
     }
 
     /**
+     * Sets the timeout to use when polling.
+     * <p/>
+     * Use 0 or negative to not use timeout and block until data is available.
+     *
+     * @param timeout timeout in millis.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
      * Enriches the input data (<code>exchange</code>) by first obtaining
      * additional data from an endpoint represented by an endpoint
      * <code>producer</code> and second by aggregating input data and additional
      * data. Aggregation of input data and additional data is delegated to an
-     * {@link AggregationStrategy} object set at construction time. If the
+     * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
      * message exchange with the resource endpoint fails then no aggregation
      * will be done and the failed exchange content is copied over to the
      * original message exchange.
-     * 
+     *
      * @param exchange input data.
      */
     public void process(Exchange exchange) throws Exception {
-        Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
-        producer.process(resourceExchange);
+        Exchange resourceExchange;
+        if (timeout < 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Consumer receive: " + consumer);
+            }
+            resourceExchange = consumer.receive();
+        } else if (timeout == 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Consumer receiveNoWait: " + consumer);
+            }
+            resourceExchange = consumer.receiveNoWait();
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Consumer receive with timeout: " + timeout + " ms. " + consumer);
+            }
+            resourceExchange = consumer.receive(timeout);
+        }
 
-        if (resourceExchange.isFailed()) {
+        if (resourceExchange != null && resourceExchange.isFailed()) {
             // copy resource exchange onto original exchange (preserving pattern)
             copyResultsPreservePattern(exchange, resourceExchange);
         } else {
@@ -105,7 +137,10 @@
 
             // aggregate original exchange and resource exchange
             // but do not aggregate if the resource exchange was filtered
-            Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
+            Boolean filtered = null;
+            if (resourceExchange != null) {
+                filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
+            }
             if (filtered == null || !filtered) {
                 // prepare the exchanges for aggregation
                 ExchangeHelper.prepareAggregation(exchange, resourceExchange);
@@ -121,7 +156,7 @@
     }
 
     /**
-     * Creates a new {@link DefaultExchange} instance from the given
+     * Creates a new {@link org.apache.camel.impl.DefaultExchange} instance from the given
      * <code>exchange</code>. The resulting exchange's pattern is defined by
      * <code>pattern</code>.
      *
@@ -148,15 +183,15 @@
 
     @Override
     public String toString() {
-        return "Enrich[" + producer.getEndpoint().getEndpointUri() + "]";
+        return "PollEnrich[" + consumer + "]";
     }
 
     protected void doStart() throws Exception {
-        producer.start();
+        consumer.start();
     }
 
     protected void doStop() throws Exception {
-        producer.stop();
+        consumer.stop();
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {
@@ -168,4 +203,4 @@
 
     }
 
-}
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Mon Jun 29 11:43:38 2009
@@ -45,6 +45,7 @@
 PackageScanDefinition
 PipelineDefinition
 PolicyDefinition
+PollEnrichDefinition
 ProcessDefinition
 RecipientListDefinition
 RedeliveryPolicyDefinition

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java (from r789205, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java Mon Jun 29 11:43:38 2009
@@ -23,7 +23,7 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
-public class EnricherTest extends ContextTestSupport {
+public class PollEnricherTest extends ContextTestSupport {
 
     private static SampleAggregator aggregationStrategy = new SampleAggregator();
 
@@ -44,51 +44,66 @@
     //  InOnly routes
     // -------------------------------------------------------------
 
-    public void testEnrichInOnly() throws InterruptedException {
+    public void testPollEnrichInOnly() throws InterruptedException {
+        template.sendBody("seda:foo1", "blah");
+
         mock.expectedBodiesReceived("test:blah");
         template.sendBody("direct:enricher-test-1", "test");
         mock.assertIsSatisfied();
     }
 
-    public void testEnrichFaultInOnly() throws InterruptedException {
-        mock.expectedMessageCount(0);
-        Exchange exchange = template.send("direct:enricher-test-3", new Processor() {
-            public void process(Exchange exchange) {
-                exchange.getIn().setBody("test");
-            }
-        });
+    public void testPollEnrichInOnlyWaitWithTimeout() throws InterruptedException {
+        // this first try there is no data so we timeout
+        mock.expectedBodiesReceived("test:blah");
+        template.sendBody("direct:enricher-test-2", "test");
+        // not expected data so we are not happy
+        mock.assertIsNotSatisfied();
+
+        // now send it and try again
+        mock.reset();
+        template.sendBody("seda:foo2", "blah");
+        template.sendBody("direct:enricher-test-2", "test");
         mock.assertIsSatisfied();
-        assertEquals("test", exchange.getIn().getBody());
-        assertEquals("failed", exchange.getFault().getBody());
-        assertFalse(exchange.hasOut());
-        assertNull(exchange.getException());
     }
 
-    public void testEnrichErrorInOnly() throws InterruptedException {
-        mock.expectedMessageCount(0);
-        Exchange exchange = template.send("direct:enricher-test-4", new Processor() {
-            public void process(Exchange exchange) {
-                exchange.getIn().setBody("test");
+    public void testPollEnrichInOnlyWaitNoTimeout() throws InterruptedException {
+        // use another thread to send it after 2 seconds
+        Thread t = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                template.sendBody("seda:foo3", "blah");
             }
         });
+
+        long start = System.currentTimeMillis();
+        mock.expectedBodiesReceived("test:blah");
+        t.start();
+        template.sendBody("direct:enricher-test-3", "test");
+        // should take approx 1 sec to complete as the other thread is sending a bit later and we wait
         mock.assertIsSatisfied();
-        assertEquals("test", exchange.getIn().getBody());
-        assertEquals("failed", exchange.getException().getMessage());
-        assertFalse(exchange.hasFault());
-        assertFalse(exchange.hasOut());
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take approx 1 sec: was " + delta, delta > 900);
     }
 
     // -------------------------------------------------------------
     //  InOut routes
     // -------------------------------------------------------------
 
-    public void testEnrichInOut() throws InterruptedException {
-        String result = (String) template.sendBody("direct:enricher-test-5", ExchangePattern.InOut, "test");
+    public void testPollEnrichInOut() throws InterruptedException {
+        template.sendBody("seda:foo4", "blah");
+
+        String result = (String) template.sendBody("direct:enricher-test-4", ExchangePattern.InOut, "test");
         assertEquals("test:blah", result);
     }
 
-    public void testEnrichInOutPlusHeader() throws InterruptedException {
-        Exchange exchange = template.send("direct:enricher-test-5", ExchangePattern.InOut, new Processor() {
+    public void testPollEnrichInOutPlusHeader() throws InterruptedException {
+        template.sendBody("seda:foo4", "blah");
+
+        Exchange exchange = template.send("direct:enricher-test-4", ExchangePattern.InOut, new Processor() {
             public void process(Exchange exchange) {
                 exchange.getIn().setHeader("foo", "bar");
                 exchange.getIn().setBody("test");
@@ -100,30 +115,6 @@
         assertNull(exchange.getException());
     }
 
-    public void testEnrichFaultInOut() throws InterruptedException {
-        Exchange exchange = template.send("direct:enricher-test-7", ExchangePattern.InOut, new Processor() {
-            public void process(Exchange exchange) {
-                exchange.getIn().setBody("test");
-            }
-        });
-        assertEquals("test", exchange.getIn().getBody());
-        assertEquals("failed", exchange.getFault().getBody());
-        assertFalse(exchange.hasOut());
-        assertNull(exchange.getException());
-    }
-
-    public void testEnrichErrorInOut() throws InterruptedException {
-        Exchange exchange = template.send("direct:enricher-test-8", ExchangePattern.InOut, new Processor() {
-            public void process(Exchange exchange) {
-                exchange.getIn().setBody("test");
-            }
-        });
-        assertEquals("test", exchange.getIn().getBody());
-        assertEquals("failed", exchange.getException().getMessage());
-        assertFalse(exchange.hasFault());
-        assertFalse(exchange.hasOut());
-    }
-
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
@@ -132,39 +123,25 @@
                 // -------------------------------------------------------------
 
                 from("direct:enricher-test-1")
-                    .enrich("direct:enricher-constant-resource", aggregationStrategy)
+                    .pollEnrich("seda:foo1", aggregationStrategy)
                     .to("mock:mock");
 
-                from("direct:enricher-test-3")
-                    .enrich("direct:enricher-fault-resource", aggregationStrategy)
+                from("direct:enricher-test-2")
+                    .pollEnrich("seda:foo2", 1000, aggregationStrategy)
                     .to("mock:mock");
 
-                from("direct:enricher-test-4").errorHandler(noErrorHandler()) // avoid re-deliveries
-                    .enrich("direct:enricher-error-resource", aggregationStrategy).to("mock:mock");
+                from("direct:enricher-test-3")
+                    .pollEnrich("seda:foo3", -1, aggregationStrategy)
+                    .to("mock:mock");
 
                 // -------------------------------------------------------------
                 //  InOut routes
                 // -------------------------------------------------------------
 
-                from("direct:enricher-test-5")
-                    .enrich("direct:enricher-constant-resource", aggregationStrategy);
-
-                from("direct:enricher-test-7")
-                    .enrich("direct:enricher-fault-resource", aggregationStrategy);
-
-                from("direct:enricher-test-8").errorHandler(noErrorHandler()) // avoid re-deliveries
-                    .enrich("direct:enricher-error-resource", aggregationStrategy);
-
-                // -------------------------------------------------------------
-                //  Enricher resources
-                // -------------------------------------------------------------
-
-                from("direct:enricher-constant-resource").transform().constant("blah");
-                
-                from("direct:enricher-fault-resource").errorHandler(noErrorHandler()).process(new FailureProcessor(false));
-                from("direct:enricher-error-resource").errorHandler(noErrorHandler()).process(new FailureProcessor(true));
+                from("direct:enricher-test-4")
+                    .pollEnrich("seda:foo4", aggregationStrategy);
             }
         };
     }
 
-}
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java Mon Jun 29 11:43:38 2009
@@ -22,6 +22,9 @@
 public class SampleAggregator implements AggregationStrategy {
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (newExchange == null) {
+            return oldExchange;
+        }
         Object oldBody = oldExchange.getIn().getBody();
         Object newBody = newExchange.getIn().getBody();
         oldExchange.getIn().setBody(oldBody + ":" + newBody);

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java (from r789205, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java Mon Jun 29 11:43:38 2009
@@ -17,32 +17,13 @@
 package org.apache.camel.spring.processor;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.component.mock.MockEndpoint;
-
+import org.apache.camel.processor.enricher.PollEnricherTest;
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
-/**
- * @author Martin Krasser
- */
-public class SpringEnricherTest extends ContextTestSupport {
-
-    private MockEndpoint mock;
-    
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        mock = getMockEndpoint("mock:result");
-    }
+public class SpringPollEnricherTest extends PollEnricherTest {
 
-    public void testEnrich() throws Exception {
-        mock.expectedBodiesReceived("test:blah");
-        template.sendBody("direct:start", "test");
-        mock.assertIsSatisfied();
-    }
-    
     protected CamelContext createCamelContext() throws Exception {
-        return createSpringCamelContext(this, "org/apache/camel/spring/processor/enricher.xml");
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnricher.xml");
     }
 
-}
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml (from r789205, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml Mon Jun 29 11:43:38 2009
@@ -22,22 +22,34 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-  <!-- START SNIPPET: example -->
-  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
-    <route>
-      <from uri="direct:start"/>
-      <enrich uri="direct:resource" strategyRef="sampleAggregator"/>
-      <to uri="mock:result"/>
-    </route>
-    <route>
-      <from uri="direct:resource"/>
-      <transform>
-        <constant>blah</constant>
-      </transform>
-    </route>
-  </camelContext>
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <!-- START SNIPPET: e1 -->
+        <route>
+            <from uri="direct:enricher-test-1"/>
+            <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
+            <to uri="mock:mock"/>
+        </route>
+        <!-- END SNIPPET: e1 -->
 
-  <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator" />
-  <!-- END SNIPPET: example -->
+        <route>
+            <from uri="direct:enricher-test-2"/>
+            <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/>
+            <to uri="mock:mock"/>
+        </route>
+
+        <route>
+            <from uri="direct:enricher-test-3"/>
+            <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/>
+            <to uri="mock:mock"/>
+        </route>
+
+        <route>
+            <from uri="direct:enricher-test-4"/>
+            <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
+            <to uri="mock:mock"/>
+        </route>
+    </camelContext>
+
+    <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
 
 </beans>