You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/02 15:15:01 UTC

svn commit: r959977 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/interceptor/ main/java/org/apache/camel/spi/ main/java/o...

Author: davsclaus
Date: Fri Jul  2 13:14:59 2010
New Revision: 959977

URL: http://svn.apache.org/viewvc?rev=959977&view=rev
Log:
CAMEL-2876: Delayer and throttle EIP supports non blocking delays using .asyncDelayed() option.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java
      - copied, changed from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
      - copied, changed from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Jul  2 13:14:59 2010
@@ -155,6 +155,29 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    public ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef) {
+        ScheduledExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
+        if (answer != null && LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Looking up ScheduledExecutorService with ref: " + executorServiceRef + " and found it from Registry: " + answer);
+            }
+        }
+
+        if (answer == null) {
+            ThreadPoolProfile profile = getThreadPoolProfile(name);
+            if (profile != null) {
+                int poolSize = profile.getPoolSize();
+                answer = newScheduledThreadPool(source, name, poolSize);
+                if (answer != null && LOG.isDebugEnabled()) {
+                    LOG.debug("Looking up ScheduledExecutorService with ref: " + executorServiceRef
+                            + " and found a matching ThreadPoolProfile to create the ScheduledExecutorService: " + answer);
+                }
+            }
+        }
+
+        return answer;
+    }
+
     public ExecutorService newDefaultThreadPool(Object source, String name) {
         ThreadPoolProfile profile = getDefaultThreadPoolProfile();
         ObjectHelper.notNull(profile, "DefaultThreadPoolProfile");
@@ -194,6 +217,11 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    public ScheduledExecutorService newScheduledThreadPool(Object source, String name) {
+        int poolSize = getDefaultThreadPoolProfile().getPoolSize();
+        return newScheduledThreadPool(source, name, poolSize);
+    }
+
     public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
         ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
         onThreadPoolCreated(answer);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Fri Jul  2 13:14:59 2010
@@ -16,9 +16,13 @@
  */
 package org.apache.camel.model;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -27,6 +31,7 @@ import org.apache.camel.model.language.E
 import org.apache.camel.processor.Delayer;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML <delay/> element
@@ -35,7 +40,16 @@ import org.apache.camel.util.ObjectHelpe
  */
 @XmlRootElement(name = "delay")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DelayDefinition extends ExpressionNode {
+public class DelayDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<DelayDefinition> {
+
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
+    @XmlAttribute
+    private Boolean asyncDelayed;
+    @XmlAttribute
+    private Boolean callerRunsWhenRejected = Boolean.TRUE;
 
     public DelayDefinition() {
     }
@@ -59,6 +73,39 @@ public class DelayDefinition extends Exp
         return "Delay[" + getExpression() + " -> " + getOutputs() + "]";
     }
 
+    @Override
+    public Processor createProcessor(RouteContext routeContext) throws Exception {
+        Processor childProcessor = this.createChildProcessor(routeContext, false);
+        Expression delay = createAbsoluteTimeDelayExpression(routeContext);
+
+        ScheduledExecutorService scheduled = null;
+        if (getAsyncDelayed() != null && getAsyncDelayed()) {
+            scheduled = ExecutorServiceHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this);
+            if (scheduled == null) {
+                scheduled = routeContext.getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "Delay");
+            }
+        }
+
+        Delayer answer = new Delayer(childProcessor, delay, scheduled);
+        if (getAsyncDelayed() != null) {
+            answer.setAsyncDelayed(getAsyncDelayed());
+        }
+        if (getCallerRunsWhenRejected() != null) {
+            answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
+        }
+        return answer;
+    }
+
+    private Expression createAbsoluteTimeDelayExpression(RouteContext routeContext) {
+        ExpressionDefinition expr = getExpression();
+        if (expr != null) {
+            if (ObjectHelper.isNotEmpty(expr.getExpression()) || expr.getExpressionValue() != null) {
+                return expr.createExpression(routeContext);
+            } 
+        } 
+        return null;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
@@ -72,21 +119,72 @@ public class DelayDefinition extends Exp
         setExpression(new ExpressionDefinition(ExpressionBuilder.constantExpression(delay)));
         return this;
     }
-    
-    @Override
-    public Processor createProcessor(RouteContext routeContext) throws Exception {
-        Processor childProcessor = this.createChildProcessor(routeContext, false);
-        Expression delay = createAbsoluteTimeDelayExpression(routeContext);
-        return new Delayer(childProcessor, delay);
+
+    /**
+     * Whether or not the caller should run the task when it was rejected by the thread pool.
+     * <p/>
+     * Is by default <tt>true</tt>
+     *
+     * @param callerRunsWhenRejected whether or not the caller should run
+     * @return the builder
+     */
+    public DelayDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
+        setCallerRunsWhenRejected(callerRunsWhenRejected);
+        return this;
     }
 
-    private Expression createAbsoluteTimeDelayExpression(RouteContext routeContext) {
-        ExpressionDefinition expr = getExpression();
-        if (expr != null) {
-            if (ObjectHelper.isNotEmpty(expr.getExpression()) || expr.getExpressionValue() != null) {
-                return expr.createExpression(routeContext);
-            } 
-        } 
-        return null;
+    /**
+     * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying.
+     *
+     * @return the builder
+     */
+    public DelayDefinition asyncDelayed() {
+        setAsyncDelayed(true);
+        return this;
+    }
+
+    public DelayDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
+        return this;
+    }
+
+    public DelayDefinition executorServiceRef(String executorServiceRef) {
+        setExecutorServiceRef(executorServiceRef);
+        return this;
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    public Boolean getAsyncDelayed() {
+        return asyncDelayed;
+    }
+
+    public void setAsyncDelayed(Boolean asyncDelayed) {
+        this.asyncDelayed = asyncDelayed;
+    }
+
+    public Boolean getCallerRunsWhenRejected() {
+        return callerRunsWhenRejected;
+    }
+
+    public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
+        this.callerRunsWhenRejected = callerRunsWhenRejected;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public String getExecutorServiceRef() {
+        return executorServiceRef;
+    }
+
+    public void setExecutorServiceRef(String executorServiceRef) {
+        this.executorServiceRef = executorServiceRef;
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Fri Jul  2 13:14:59 2010
@@ -16,16 +16,18 @@
  */
 package org.apache.camel.model;
 
-import java.util.List;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Processor;
 import org.apache.camel.processor.Throttler;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
 /**
  * Represents an XML &lt;throttle/&gt; element
@@ -34,11 +36,19 @@ import org.apache.camel.spi.RouteContext
  */
 @XmlRootElement(name = "throttle")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ThrottleDefinition extends OutputDefinition<ThrottleDefinition> {
+public class ThrottleDefinition extends OutputDefinition<ThrottleDefinition> implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
     @XmlAttribute
     private Long maximumRequestsPerPeriod;
     @XmlAttribute
     private long timePeriodMillis = 1000;
+    @XmlAttribute
+    private Boolean asyncDelayed;
+    @XmlAttribute
+    private Boolean callerRunsWhenRejected = Boolean.TRUE;
 
     public ThrottleDefinition() {
     }
@@ -66,7 +76,23 @@ public class ThrottleDefinition extends 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
-        return new Throttler(childProcessor, maximumRequestsPerPeriod, timePeriodMillis);
+
+        ScheduledExecutorService scheduled = null;
+        if (getAsyncDelayed() != null && getAsyncDelayed()) {
+            scheduled = ExecutorServiceHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this);
+            if (scheduled == null) {
+                scheduled = routeContext.getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "Throttle");
+            }
+        }
+
+        Throttler answer = new Throttler(childProcessor, maximumRequestsPerPeriod, timePeriodMillis, scheduled);
+        if (getAsyncDelayed() != null) {
+            answer.setAsyncDelayed(getAsyncDelayed());
+        }
+        if (getCallerRunsWhenRejected() != null) {
+            answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
+        }
+        return answer;
     }
 
     // Fluent API
@@ -94,6 +120,39 @@ public class ThrottleDefinition extends 
         return this;
     }
 
+    /**
+     * Whether or not the caller should run the task when it was rejected by the thread pool.
+     * <p/>
+     * Is by default <tt>true</tt>
+     *
+     * @param callerRunsWhenRejected whether or not the caller should run
+     * @return the builder
+     */
+    public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
+        setCallerRunsWhenRejected(callerRunsWhenRejected);
+        return this;
+    }
+
+    /**
+     * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying.
+     *
+     * @return the builder
+     */
+    public ThrottleDefinition asyncDelayed() {
+        setAsyncDelayed(true);
+        return this;
+    }
+
+    public ThrottleDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
+        return this;
+    }
+
+    public ThrottleDefinition executorServiceRef(String executorServiceRef) {
+        setExecutorServiceRef(executorServiceRef);
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
 
@@ -113,4 +172,35 @@ public class ThrottleDefinition extends 
         this.timePeriodMillis = timePeriodMillis;
     }
 
+    public Boolean getAsyncDelayed() {
+        return asyncDelayed;
+    }
+
+    public void setAsyncDelayed(Boolean asyncDelayed) {
+        this.asyncDelayed = asyncDelayed;
+    }
+
+    public Boolean getCallerRunsWhenRejected() {
+        return callerRunsWhenRejected;
+    }
+
+    public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
+        this.callerRunsWhenRejected = callerRunsWhenRejected;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public String getExecutorServiceRef() {
+        return executorServiceRef;
+    }
+
+    public void setExecutorServiceRef(String executorServiceRef) {
+        this.executorServiceRef = executorServiceRef;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Fri Jul  2 13:14:59 2010
@@ -16,13 +16,14 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.AlreadyStoppedException;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -36,83 +37,140 @@ import org.apache.commons.logging.LogFac
  */
 public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     protected final transient Log log = LogFactory.getLog(getClass());
-    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
-    private boolean fastStop = true;
+    private final ScheduledExecutorService executorService;
+    private boolean asyncDelayed;
+    private boolean callerRunsWhenRejected = true;
+
+    // TODO: Add option to cancel tasks on shutdown so we can stop fast
+
+    private final class ProcessCall implements Runnable {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+
+        public ProcessCall(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+        }
+
+        public void run() {
+            if (log.isTraceEnabled()) {
+                log.trace("Delayed task woke up and continues routing for exchangeId: " + exchange.getExchangeId());
+            }
+            if (!isRunAllowed()) {
+                exchange.setException(new RejectedExecutionException("Run is not allowed"));
+            }
+            DelayProcessorSupport.super.process(exchange, callback);
+            // signal callback we are done async
+            callback.done(false);
+        }
+    }
 
     public DelayProcessorSupport(Processor processor) {
+        this(processor, null);
+    }
+
+    public DelayProcessorSupport(Processor processor, ScheduledExecutorService executorService) {
         super(processor);
+        this.executorService = executorService;
     }
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        try {
-            delay(exchange);
-        } catch (Exception e) {
-            // exception occurred so we are done
-            exchange.setException(e);
+        if (!isRunAllowed()) {
+            exchange.setException(new RejectedExecutionException("Run is not allowed"));
             callback.done(true);
             return true;
         }
-        return super.process(exchange, callback);
+
+        // calculate delay and wait
+        long delay = calculateDelay(exchange);
+        if (delay <= 0) {
+            // no delay then continue routing
+            return super.process(exchange, callback);
+        }
+
+        if (!isAsyncDelayed() || exchange.isTransacted()) {
+            // use synchronous delay (also required if using transactions)
+            try {
+                delay(delay, exchange);
+                // then continue routing
+                return super.process(exchange, callback);
+            } catch (Exception e) {
+                // exception occurred so we are done
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        } else {
+            // asynchronous delay so schedule a process call task
+            ProcessCall call = new ProcessCall(exchange, callback);
+            try {
+                if (log.isTraceEnabled()) {
+                    log.trace("Scheduling delayed task to run in " + delay + " millis for exchangeId: " + exchange.getExchangeId());
+                }
+                executorService.schedule(call, delay, TimeUnit.MILLISECONDS);
+                // tell Camel routing engine we continue routing asynchronous
+                return false;
+            } catch (RejectedExecutionException e) {
+                if (isCallerRunsWhenRejected()) {
+                    if (!isRunAllowed()) {
+                        exchange.setException(new RejectedExecutionException());
+                    } else {
+                        // let caller run by processing
+                        delay(delay, exchange);
+                        // then continue routing
+                        return super.process(exchange, callback);
+                    }
+                } else {
+                    exchange.setException(e);
+                }
+                // caller don't run the task so we are done
+                callback.done(true);
+                return true;
+            }
+        }
     }
 
-    public boolean isFastStop() {
-        return fastStop;
+    public boolean isAsyncDelayed() {
+        return asyncDelayed;
     }
 
-    /**
-     * Enables & disables a fast stop; basically to avoid waiting a possibly
-     * long time for delays to complete before the context shuts down; instead
-     * the current processing method throws
-     * {@link org.apache.camel.AlreadyStoppedException} to terminate processing.
-     */
-    public void setFastStop(boolean fastStop) {
-        this.fastStop = fastStop;
+    public void setAsyncDelayed(boolean asyncDelayed) {
+        this.asyncDelayed = asyncDelayed;
+    }
+
+    public boolean isCallerRunsWhenRejected() {
+        return callerRunsWhenRejected;
     }
 
-    protected void doStop() throws Exception {
-        stoppedLatch.countDown();
-        super.doStop();
+    public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
+        this.callerRunsWhenRejected = callerRunsWhenRejected;
     }
 
-    protected abstract void delay(Exchange exchange) throws Exception;
+    protected abstract long calculateDelay(Exchange exchange);
 
     /**
-     * Wait until the given system time before continuing
+     * Delays the given time before continuing.
+     * <p/>
+     * This implementation will block while waiting
      * 
-     * @param time the system time to wait for
+     * @param delay the delay time in millis
      * @param exchange the exchange being processed
      */
-    protected void waitUntil(long time, Exchange exchange) throws Exception {
+    protected void delay(long delay, Exchange exchange) {
         // only run is we are started
-        while (isRunAllowed()) {
-            long delay = time - currentSystemTime();
-            if (delay < 0) {
-                return;
-            } else {
-                if (isFastStop() && !isRunAllowed()) {
-                    throw new AlreadyStoppedException();
-                }
-                try {
-                    sleep(delay);
-                } catch (InterruptedException e) {
-                    handleSleepInterruptedException(e);
-                }
-            }
+        if (!isRunAllowed()) {
+            return;
         }
-    }
 
-    protected void sleep(long delay) throws InterruptedException {
-        if (delay <= 0) {
+        if (delay < 0) {
             return;
-        }
-        if (log.isTraceEnabled()) {
-            log.trace("Sleeping for: " + delay + " millis");
-        }
-        if (isFastStop()) {
-            stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
         } else {
-            Thread.sleep(delay);
+            try {
+                sleep(delay);
+            } catch (InterruptedException e) {
+                handleSleepInterruptedException(e);
+            }
         }
     }
 
@@ -129,4 +187,22 @@ public abstract class DelayProcessorSupp
     protected long currentSystemTime() {
         return System.currentTimeMillis();
     }
+
+    private void sleep(long delay) throws InterruptedException {
+        if (delay <= 0) {
+            return;
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Sleeping for: " + delay + " millis");
+        }
+        Thread.sleep(delay);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (isAsyncDelayed()) {
+            ObjectHelper.notNull(executorService, "executorService", this);
+        }
+        super.doStart();
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Fri Jul  2 13:14:59 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -33,8 +35,8 @@ public class Delayer extends DelayProces
     private Expression delay;
     private long delayValue;
 
-    public Delayer(Processor processor, Expression delay) {
-        super(processor);
+    public Delayer(Processor processor, Expression delay, ScheduledExecutorService executorService) {
+        super(processor, executorService);
         this.delay = delay;
     }
 
@@ -62,11 +64,7 @@ public class Delayer extends DelayProces
     // Implementation methods
     // -------------------------------------------------------------------------
 
-    /**
-     * Waits for an optional time period before continuing to process the
-     * exchange
-     */
-    protected void delay(Exchange exchange) throws Exception {
+    protected long calculateDelay(Exchange exchange) {
         long time = 0;
         if (delay != null) {
             Long longValue = delay.evaluate(exchange, Long.class);
@@ -79,21 +77,10 @@ public class Delayer extends DelayProces
         }
         if (time <= 0) {
             // no delay
-            return;
+            return 0;
         }
 
-        // now add the current time
-        time += defaultProcessTime(exchange);
-
-        waitUntil(time, exchange);
-    }
-
-    /**
-     * A Strategy Method to allow derived implementations to decide the current
-     * system time or some other default exchange property
-     */
-    protected long defaultProcessTime(Exchange exchange) {
-        return currentSystemTime();
+        return time;
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Jul  2 13:14:59 2010
@@ -77,6 +77,7 @@ public class ThreadsProcessor extends Se
         ProcessCall call = new ProcessCall(exchange, callback);
         try {
             executorService.submit(call);
+            // tell Camel routing engine we continue routing asynchronous
             return false;
         } catch (RejectedExecutionException e) {
             if (isCallerRunsWhenRejected()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Fri Jul  2 13:14:59 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
@@ -33,14 +35,14 @@ import org.apache.camel.Processor;
 public class Throttler extends DelayProcessorSupport implements Traceable {
     private long maximumRequestsPerPeriod;
     private long timePeriodMillis;
-    private TimeSlot slot;
+    private volatile TimeSlot slot;
 
     public Throttler(Processor processor, long maximumRequestsPerPeriod) {
-        this(processor, maximumRequestsPerPeriod, 1000);
+        this(processor, maximumRequestsPerPeriod, 1000, null);
     }
 
-    public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
-        super(processor);
+    public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis, ScheduledExecutorService executorService) {
+        super(processor, executorService);
         this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
         this.timePeriodMillis = timePeriodMillis;
     }
@@ -81,10 +83,14 @@ public class Throttler extends DelayProc
 
     // Implementation methods
     // -----------------------------------------------------------------------
-    protected void delay(Exchange exchange) throws Exception {
+
+    protected long calculateDelay(Exchange exchange) {
         TimeSlot slot = nextSlot();
         if (!slot.isActive()) {
-            waitUntil(slot.startTime, exchange);
+            long delay = slot.startTime - currentSystemTime();
+            return delay;
+        } else {
+            return 0;
         }
     }
     
@@ -107,7 +113,7 @@ public class Throttler extends DelayProc
     */
     protected class TimeSlot {
         
-        private long capacity = Throttler.this.maximumRequestsPerPeriod;
+        private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
         private final long duration = Throttler.this.timePeriodMillis;
         private final long startTime;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java Fri Jul  2 13:14:59 2010
@@ -40,10 +40,11 @@ public class DelayInterceptor extends De
         return "DelayInterceptor[delay: " + delayer.getDelay() + " on: " + node + "]";
     }
 
-    public void delay(Exchange exchange) throws Exception {
+    public long calculateDelay(Exchange exchange) {
         if (delayer.isEnabled()) {
-            long time = currentSystemTime() + delayer.getDelay();
-            waitUntil(time, exchange);
+            return delayer.getDelay();
+        } else {
+            return 0;
         }
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Jul  2 13:14:59 2010
@@ -112,6 +112,17 @@ public interface ExecutorServiceStrategy
     ExecutorService lookup(Object source, String name, String executorServiceRef);
 
     /**
+     * Lookup a {@link java.util.concurrent.ScheduledExecutorService} from the {@link org.apache.camel.spi.Registry}
+     * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
+     *
+     * @param source               the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name                 name which is appended to the thread name
+     * @param executorServiceRef   reference to lookup
+     * @return the {@link java.util.concurrent.ScheduledExecutorService} or <tt>null</tt> if not found
+     */
+    ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef);
+
+    /**
      * Creates a new thread pool using the default thread pool profile.
      *
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
@@ -141,6 +152,8 @@ public interface ExecutorServiceStrategy
 
     /**
      * Creates a new scheduled thread pool.
+     * <p/>
+     * Will use the pool size from the default thread pool profile
      *
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
@@ -150,6 +163,15 @@ public interface ExecutorServiceStrategy
     ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
 
     /**
+     * Creates a new scheduled thread pool.
+     *
+     * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ScheduledExecutorService newScheduledThreadPool(Object source, String name);
+
+    /**
      * Creates a new fixed thread pool.
      *
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java Fri Jul  2 13:14:59 2010
@@ -1256,7 +1256,7 @@ public final class ObjectHelper {
         return "0x" + Integer.toHexString(System.identityHashCode(object));
     }
 
-    private static class ExceptionIterator implements Iterator<Throwable> {
+    private static final class ExceptionIterator implements Iterator<Throwable> {
         private List<Throwable> tree = new ArrayList<Throwable>();
         private Iterator<Throwable> it;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Jul  2 13:14:59 2010
@@ -261,4 +261,48 @@ public final class ExecutorServiceHelper
         return null;
     }
 
+    /**
+     * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition.
+     * <p/>
+     * This method will lookup for configured thread pool in the following order
+     * <ul>
+     *   <li>from the definition if any explicit configured executor service.</li>
+     *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+     *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+     *   <li>if none found, then <tt>null</tt> is returned.</li>
+     * </ul>
+     * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
+     * configured executor services in the same coherent way.
+     *
+     * @param routeContext   the rout context
+     * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+     *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+     * @param definition     the node definition which may leverage executor service.
+     * @return the configured executor service, or <tt>null</tt> if none was configured.
+     * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
+     *                                  or the found instance is not a ScheduledExecutorService type.
+     */
+    public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name,
+                                                               ExecutorServiceAwareDefinition definition) throws IllegalArgumentException {
+        ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy();
+        ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext());
+
+        // prefer to use explicit configured executor on the definition
+        if (definition.getExecutorService() != null) {
+            ExecutorService executorService = definition.getExecutorService();
+            if (executorService instanceof ScheduledExecutorService) {
+                return (ScheduledExecutorService) executorService;
+            }
+            throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance");
+        } else if (definition.getExecutorServiceRef() != null) {
+            ScheduledExecutorService answer = strategy.lookupScheduled(definition, name, definition.getExecutorServiceRef());
+            if (answer == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
+            }
+            return answer;
+        }
+
+        return null;
+    }
+
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java (from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java&r1=959949&r2=959977&rev=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java Fri Jul  2 13:14:59 2010
@@ -23,7 +23,7 @@ import org.apache.camel.component.mock.M
 /**
  * @version $Revision$
  */
-public class DelayerTest extends ContextTestSupport {
+public class DelayerAsyncDelayedTest extends ContextTestSupport {
 
     public void testSendingMessageGetsDelayed() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -54,13 +54,13 @@ public class DelayerTest extends Context
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                from("seda:a").delay().header("MyDelay").to("mock:result");
+                from("seda:a").delay().header("MyDelay").asyncDelayed().to("mock:result");
                 // END SNIPPET: ex
 
                 // START SNIPPET: ex2
-                from("seda:b").delay(1000).to("mock:result");
+                from("seda:b").delay(1000).asyncDelayed().to("mock:result");
                 // END SNIPPET: ex2
             }
         };
     }
-}
+}
\ No newline at end of file

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java (from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r1=959949&r2=959977&rev=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java Fri Jul  2 13:14:59 2010
@@ -22,14 +22,13 @@ import java.util.concurrent.Executors;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.Throttler.TimeSlot;
 
 /**
  * @version $Revision$
  */
-public class ThrottlerTest extends ContextTestSupport {
+public class ThrottlerAsyncDelayedTest extends ContextTestSupport {
     private static final int INTERVAL = 500;
-    protected int messageCount = 6;
+    protected int messageCount = 9;
 
     public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -44,7 +43,7 @@ public class ThrottlerTest extends Conte
         // to check that the throttle really does kick in
         resultEndpoint.assertIsSatisfied();
     }
-    
+
     public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
         long start = System.currentTimeMillis();
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -55,41 +54,26 @@ public class ThrottlerTest extends Conte
             executor.execute(new Runnable() {
                 public void run() {
                     template.sendBody("direct:a", "<message>payload</message>");
-                }                
+                }
             });
         }
-        
+
         // let's wait for the exchanges to arrive
         resultEndpoint.assertIsSatisfied();
-        
+
         // now assert that they have actually been throttled
         long minimumTime = (messageCount - 1) * INTERVAL;
         assertTrue("Should take at least " + minimumTime + "ms", System.currentTimeMillis() - start >= minimumTime);
     }
-    
-    public void testTimeSlotCalculus() throws Exception {
-        Throttler throttler = new Throttler(null, 2, 1000);
-        TimeSlot slot = throttler.nextSlot();
-        // start a new time slot
-        assertNotNull(slot);
-        // make sure the same slot is used (2 exchanges per slot)
-        assertSame(slot, throttler.nextSlot());
-        assertTrue(slot.isFull());
-        
-        TimeSlot next = throttler.nextSlot();
-        // now we should have a new slot that starts somewhere in the future
-        assertNotSame(slot, next);
-        assertFalse(next.isActive());
-    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
+                from("seda:a").throttle(3).timePeriodMillis(10000).asyncDelayed().to("log:result", "mock:result");
                 // END SNIPPET: ex
-                
-                from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result");
+
+                from("direct:a").throttle(1).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result", "mock:result");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Fri Jul  2 13:14:59 2010
@@ -29,7 +29,7 @@ import org.apache.camel.processor.Thrott
  */
 public class ThrottlerTest extends ContextTestSupport {
     private static final int INTERVAL = 500;
-    protected int messageCount = 6;
+    protected int messageCount = 9;
 
     public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -68,7 +68,7 @@ public class ThrottlerTest extends Conte
     }
     
     public void testTimeSlotCalculus() throws Exception {
-        Throttler throttler = new Throttler(null, 2, 1000);
+        Throttler throttler = new Throttler(null, 2, 1000, null);
         TimeSlot slot = throttler.nextSlot();
         // start a new time slot
         assertNotNull(slot);
@@ -86,10 +86,10 @@ public class ThrottlerTest extends Conte
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
+                from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
                 // END SNIPPET: ex
                 
-                from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result");
+                from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Jul  2 13:14:59 2010
@@ -34,6 +34,8 @@ log4j.logger.org.apache.activemq.spring=
 #log4j.logger.org.apache.camel.processor.RoutingSlip=TRACE
 #log4j.logger.org.apache.camel.processor.TryProcessor=TRACE
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
+#log4j.logger.org.apache.camel.processor.Delayer=TRACE
+#log4j.logger.org.apache.camel.processor.Throttler=TRACE
 log4j.logger.org.apache.camel.impl.converter=WARN
 log4j.logger.org.apache.camel.management=WARN
 log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN