You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/23 13:37:51 UTC

svn commit: r1234765 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main...

Author: davsclaus
Date: Mon Jan 23 12:37:51 2012
New Revision: 1234765

URL: http://svn.apache.org/viewvc?rev=1234765&view=rev
Log:
CAMEL-4925: Threads EIP now better suppports rejected tasks to ensure exchange gets done, for example to de-register the exchange in the inflight registry. Thanks to Sergey Zhemzhitsky for the patch.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Reject executing or processing some task.
+ */
+public interface Rejectable {
+
+    /**
+     * The task was rejected.
+     */
+    void reject();
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java Mon Jan 23 12:37:51 2012
@@ -29,8 +29,6 @@ import javax.xml.bind.annotation.XmlType
  * a new task.
  * <p/>
  * Camel will by default use <tt>CallerRuns</tt>.
- *
- * @version 
  */
 @XmlType
 @XmlEnum(String.class)
@@ -40,13 +38,58 @@ public enum ThreadPoolRejectedPolicy {
 
     public RejectedExecutionHandler asRejectedExecutionHandler() {
         if (this == Abort) {
-            return new ThreadPoolExecutor.AbortPolicy();
+            return new RejectedExecutionHandler() {
+                @Override
+                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                    if (r instanceof Rejectable) {
+                        ((Rejectable) r).reject();
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "Abort";
+                }
+            };
         } else if (this == CallerRuns) {
-            return new ThreadPoolExecutor.CallerRunsPolicy();
+            return new ThreadPoolExecutor.CallerRunsPolicy() {
+                @Override
+                public String toString() {
+                    return "CallerRuns";
+                }
+            };
         } else if (this == DiscardOldest) {
-            return new ThreadPoolExecutor.DiscardOldestPolicy();
+            return new RejectedExecutionHandler() {
+                @Override
+                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                    if (!executor.isShutdown()) {
+                        Runnable rejected = executor.getQueue().poll();
+                        if (rejected instanceof Rejectable) {
+                            ((Rejectable) rejected).reject();
+                        }
+                        executor.execute(r);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "DiscardOldest";
+                }
+            };
         } else if (this == Discard) {
-            return new ThreadPoolExecutor.DiscardPolicy();
+            return new RejectedExecutionHandler() {
+                @Override
+                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                    if (r instanceof Rejectable) {
+                        ((Rejectable) r).reject();
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "Discard";
+                }
+            };
         }
         throw new IllegalArgumentException("Unknown ThreadPoolRejectedPolicy: " + this);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java Mon Jan 23 12:37:51 2012
@@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
 import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 
 /**
@@ -82,7 +84,7 @@ public class DefaultThreadPoolFactory im
             workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
         }
 
-        ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+        ThreadPoolExecutor answer = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
         answer.setThreadFactory(threadFactory);
         if (rejectedExecutionHandler == null) {
             rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
@@ -98,7 +100,7 @@ public class DefaultThreadPoolFactory im
             rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
         }
 
-        ScheduledThreadPoolExecutor answer = new ScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+        ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
         // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
 
         // need to wrap the thread pool in a sized to guard against the problem that the

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Mon Jan 23 12:37:51 2012
@@ -19,6 +19,7 @@ package org.apache.camel.model;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.xml.bind.annotation.XmlAccessType;
@@ -102,6 +103,7 @@ public class ThreadsDefinition extends O
         } else {
             thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
         }
+        thread.setRejectedPolicy(getRejectedPolicy());
 
         List<Processor> pipe = new ArrayList<Processor>(2);
         pipe.add(thread);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Mon Jan 23 12:37:51 2012
@@ -24,23 +24,41 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Rejectable;
+import org.apache.camel.ThreadPoolRejectedPolicy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s
  * using the asynchronous routing engine.
- *
- * @version 
+ * <p/>
+ * Pay attention to how this processor handles rejected tasks.
+ * <ul>
+ * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception,
+ * and marked to stop continue routing.
+ * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li>
+ * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set).
+ * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li>
+ * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set).
+ * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.
+ * And the current exchange will be added to the task queue.</li>
+ * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread
+ * will not be free to process a new exchange, as its processing the current exchange.</li>
+ * </ul>
  */
 public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class);
     private final ExecutorService executorService;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
     private boolean callerRunsWhenRejected = true;
+    private ThreadPoolRejectedPolicy rejectedPolicy;
 
-    private final class ProcessCall implements Runnable {
+    private final class ProcessCall implements Runnable, Rejectable {
         private final Exchange exchange;
         private final AsyncCallback callback;
 
@@ -49,12 +67,38 @@ public class ThreadsProcessor extends Se
             this.callback = callback;
         }
 
+        @Override
         public void run() {
+            LOG.trace("Continue routing exchange {} ", exchange);
             if (shutdown.get()) {
                 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
             }
             callback.done(false);
         }
+
+        @Override
+        public void reject() {
+            // abort should mark the exchange with an rejected exception
+            boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy;
+            if (abort) {
+                exchange.setException(new RejectedExecutionException());
+            }
+
+            LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
+            // we should not continue routing, and no redelivery should be performed
+            exchange.setProperty(Exchange.ROUTE_STOP, true);
+            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true);
+
+            if (shutdown.get()) {
+                exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
+            }
+            callback.done(false);
+        }
+
+        @Override
+        public String toString() {
+            return "ProcessCall[" + exchange + "]";
+        }
     }
 
     public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService) {
@@ -74,19 +118,21 @@ public class ThreadsProcessor extends Se
 
         ProcessCall call = new ProcessCall(exchange, callback);
         try {
+            LOG.trace("Submitting task {}", call);
             executorService.submit(call);
             // tell Camel routing engine we continue routing asynchronous
             return false;
         } catch (RejectedExecutionException e) {
-            if (isCallerRunsWhenRejected()) {
-                if (shutdown.get()) {
-                    exchange.setException(new RejectedExecutionException());
-                } else {
-                    callback.done(true);
-                }
-            } else {
+            boolean callerRuns = isCallerRunsWhenRejected();
+            if (!callerRuns) {
                 exchange.setException(e);
             }
+
+            LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call);
+            if (shutdown.get()) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
             return true;
         }
     }
@@ -99,6 +145,14 @@ public class ThreadsProcessor extends Se
         this.callerRunsWhenRejected = callerRunsWhenRejected;
     }
 
+    public ThreadPoolRejectedPolicy getRejectedPolicy() {
+        return rejectedPolicy;
+    }
+
+    public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+        this.rejectedPolicy = rejectedPolicy;
+    }
+
     public String toString() {
         return "Threads";
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import org.apache.camel.Rejectable;
+
+/**
+ * A {@link Rejectable} {@link FutureTask} used by {@link RejectableThreadPoolExecutor}.
+ *
+ * @see RejectableThreadPoolExecutor
+ */
+public class RejectableFutureTask<V> extends FutureTask<V> implements Rejectable {
+
+    private final Rejectable rejectable;
+
+    public RejectableFutureTask(Callable<V> callable) {
+        super(callable);
+        this.rejectable = callable instanceof Rejectable ? (Rejectable) callable : null;
+    }
+
+    public RejectableFutureTask(Runnable runnable, V result) {
+        super(runnable, result);
+        this.rejectable = runnable instanceof Rejectable ? (Rejectable) runnable : null;
+    }
+
+    @Override
+    public void reject() {
+        if (rejectable != null) {
+            rejectable.reject();
+        }
+    }
+
+}

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Scheduled thread pool executor that creates {@link RejectableFutureTask} instead of
+ * {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
+ * <p/>
+ * Instances of {@link RejectableFutureTask} are required to handle {@link org.apache.camel.ThreadPoolRejectedPolicy#Discard}
+ * and {@link org.apache.camel.ThreadPoolRejectedPolicy#DiscardOldest} policies correctly, e.g. notify
+ * {@link Callable} and {@link Runnable} tasks when they are rejected.
+ * To be notified of rejection tasks have to implement {@link org.apache.camel.Rejectable} interface: <br/>
+ * <code><pre>
+ * public class RejectableTask implements Runnable, Rejectable {
+ *     &#064;Override
+ *     public void run() {
+ *         // execute task
+ *     }
+ *     &#064;Override
+ *     public void reject() {
+ *         // do something useful on rejection
+ *     }
+ * }
+ * </pre></code>
+ * <p/>
+ * If the task does not implement {@link org.apache.camel.Rejectable} interface the behavior is exactly the same as with
+ * ordinary {@link ScheduledThreadPoolExecutor}.
+ *
+ * @see RejectableFutureTask
+ * @see org.apache.camel.Rejectable
+ * @see RejectableThreadPoolExecutor
+ */
+public class RejectableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
+
+    public RejectableScheduledThreadPoolExecutor(int corePoolSize) {
+        super(corePoolSize);
+    }
+
+    public RejectableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
+        super(corePoolSize, threadFactory);
+    }
+
+    public RejectableScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
+        super(corePoolSize, handler);
+    }
+
+    public RejectableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
+        super(corePoolSize, threadFactory, handler);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        return new RejectableFutureTask<T>(runnable, value);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+        return new RejectableFutureTask<T>(callable);
+    }
+
+}

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thread pool executor that creates {@link RejectableFutureTask} instead of
+ * {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
+ * <p/>
+ * Instances of {@link RejectableFutureTask} are required to handle {@link org.apache.camel.ThreadPoolRejectedPolicy#Discard}
+ * and {@link org.apache.camel.ThreadPoolRejectedPolicy#DiscardOldest} policies correctly, e.g. notify
+ * {@link Callable} and {@link Runnable} tasks when they are rejected.
+ * To be notified of rejection tasks have to implement {@link org.apache.camel.Rejectable} interface: <br/>
+ * <code><pre>
+ * public class RejectableTask implements Runnable, Rejectable {
+ *     &#064;Override
+ *     public void run() {
+ *         // execute task
+ *     }
+ *     &#064;Override
+ *     public void reject() {
+ *         // do something useful on rejection
+ *     }
+ * }
+ * </pre></code>
+ * <p/>
+ * If the task does not implement {@link org.apache.camel.Rejectable} interface the behavior is exactly the same as with
+ * ordinary {@link ThreadPoolExecutor}.
+ *
+ * @see RejectableFutureTask
+ * @see org.apache.camel.Rejectable
+ * @see RejectableScheduledThreadPoolExecutor
+ */
+public class RejectableThreadPoolExecutor extends ThreadPoolExecutor {
+
+    public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+    }
+
+    public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+    }
+
+    public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        return new RejectableFutureTask<T>(runnable, value);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+        return new RejectableFutureTask<T>(callable);
+    }
+
+}

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Mon Jan 23 12:37:51 2012
@@ -237,7 +237,7 @@ public class DefaultExecutorServiceManag
         // should inherit the default values
         assertEquals(10, tp.getCorePoolSize());
         assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception {
@@ -263,7 +263,7 @@ public class DefaultExecutorServiceManag
         // should inherit the default values
         assertEquals(1, tp.getCorePoolSize());
         assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.AbortPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("Abort", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception {
@@ -285,7 +285,7 @@ public class DefaultExecutorServiceManag
         // should inherit the default values
         assertEquals(50, tp.getMaximumPoolSize());
         assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testNewThreadPoolProfile() throws Exception {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Mon Jan 23 12:37:51 2012
@@ -239,7 +239,7 @@ public class DefaultExecutorServiceStrat
         // should inherit the default values
         assertEquals(10, tp.getCorePoolSize());
         assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception {
@@ -265,7 +265,7 @@ public class DefaultExecutorServiceStrat
         // should inherit the default values
         assertEquals(1, tp.getCorePoolSize());
         assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.AbortPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("Abort", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception {
@@ -287,7 +287,7 @@ public class DefaultExecutorServiceStrat
         // should inherit the default values
         assertEquals(50, tp.getMaximumPoolSize());
         assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testNewThreadPoolProfile() throws Exception {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java Mon Jan 23 12:37:51 2012
@@ -22,6 +22,8 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -43,8 +45,6 @@ public class ThreadsRejectedExecutionTes
                 // this should force the ThreadsProcessor to run the tasks itself
                 ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
 
-                context.setTracing(true);
-
                 from("seda:start")
                     .to("log:before")
                     // will use our custom pool
@@ -73,8 +73,6 @@ public class ThreadsRejectedExecutionTes
                 // this should force the ThreadsProcessor to run the tasks itself
                 ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
 
-                context.setTracing(true);
-
                 from("seda:start")
                     .to("log:before")
                     // will use our custom pool
@@ -102,4 +100,150 @@ public class ThreadsRejectedExecutionTes
         assertEquals(1, mock.getReceivedCounter());
     }
 
+    public void testThreadsRejectedDiscard() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                        .to("log:before")
+                        .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.Discard)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+        
+        getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:start", "Message " + i);
+        }
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matchesMockWaitTime());
+
+        int inflight = context.getInflightRepository().size();
+        assertEquals(0, inflight);
+    }
+    
+    public void testThreadsRejectedDiscardOldest() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                        .to("log:before")
+                        .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+        getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:start", "Message " + i);
+        }
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matchesMockWaitTime());
+
+        int inflight = context.getInflightRepository().size();
+        assertEquals(0, inflight);
+    }
+
+    public void testThreadsRejectedAbort() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                        .to("log:before")
+                        .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+        getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:start", "Message " + i);
+        }
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matchesMockWaitTime());
+
+        int inflight = context.getInflightRepository().size();
+        assertEquals(0, inflight);
+    }
+
+    public void testThreadsRejectedCallerRuns() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start")
+                        .to("log:before")
+                        .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns)
+                        .delay(200)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+        getMockEndpoint("mock:result").expectedMessageCount(10);
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:start", "Message " + i);
+        }
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matchesMockWaitTime());
+
+        int inflight = context.getInflightRepository().size();
+        assertEquals(0, inflight);
+    }
+
+    public void testThreadsRejectedAbortNoRedelivery() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class).maximumRedeliveries(3).handled(true).to("mock:error");
+
+                from("seda:start")
+                        .to("log:before")
+                        .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+        // there should be error handling for aborted tasks (eg no redeliveries and no error handling)
+        getMockEndpoint("mock:error").expectedMessageCount(0);
+
+        getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:start", "Message " + i);
+        }
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matchesMockWaitTime());
+
+        int inflight = context.getInflightRepository().size();
+        assertEquals(0, inflight);
+    }
+
+    
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java Mon Jan 23 12:37:51 2012
@@ -65,7 +65,7 @@ public class PollEnricherRefTest extends
             public void configure() throws Exception {
                 cool.setEndpointUriIfNotSpecified("cool");
 
-                from("direct:start").pollEnrichRef("cool", 1000, "agg");
+                from("direct:start").pollEnrichRef("cool", 2000, "agg");
             }
         };
     }

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java Mon Jan 23 12:37:51 2012
@@ -50,7 +50,7 @@ public class SpringCamelContextThreadPoo
         assertEquals(5, tp.getMaximumPoolSize());
         // should inherit default options
         assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
     }
 
     public void testBigProfile() throws Exception {
@@ -70,7 +70,7 @@ public class SpringCamelContextThreadPoo
         assertEquals(100, tp.getMaximumPoolSize());
         // should inherit default options
         assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
-        assertIsInstanceOf(ThreadPoolExecutor.DiscardOldestPolicy.class, tp.getRejectedExecutionHandler());
+        assertEquals("DiscardOldest", tp.getRejectedExecutionHandler().toString());
     }
 
 }
\ No newline at end of file