You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/23 13:37:51 UTC
svn commit: r1234765 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main...
Author: davsclaus
Date: Mon Jan 23 12:37:51 2012
New Revision: 1234765
URL: http://svn.apache.org/viewvc?rev=1234765&view=rev
Log:
CAMEL-4925: Threads EIP now better suppports rejected tasks to ensure exchange gets done, for example to de-register the exchange in the inflight registry. Thanks to Sergey Zhemzhitsky for the patch.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Rejectable.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Reject executing or processing some task.
+ */
+public interface Rejectable {
+
+ /**
+ * The task was rejected.
+ */
+ void reject();
+
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java Mon Jan 23 12:37:51 2012
@@ -29,8 +29,6 @@ import javax.xml.bind.annotation.XmlType
* a new task.
* <p/>
* Camel will by default use <tt>CallerRuns</tt>.
- *
- * @version
*/
@XmlType
@XmlEnum(String.class)
@@ -40,13 +38,58 @@ public enum ThreadPoolRejectedPolicy {
public RejectedExecutionHandler asRejectedExecutionHandler() {
if (this == Abort) {
- return new ThreadPoolExecutor.AbortPolicy();
+ return new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ if (r instanceof Rejectable) {
+ ((Rejectable) r).reject();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Abort";
+ }
+ };
} else if (this == CallerRuns) {
- return new ThreadPoolExecutor.CallerRunsPolicy();
+ return new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public String toString() {
+ return "CallerRuns";
+ }
+ };
} else if (this == DiscardOldest) {
- return new ThreadPoolExecutor.DiscardOldestPolicy();
+ return new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ if (!executor.isShutdown()) {
+ Runnable rejected = executor.getQueue().poll();
+ if (rejected instanceof Rejectable) {
+ ((Rejectable) rejected).reject();
+ }
+ executor.execute(r);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DiscardOldest";
+ }
+ };
} else if (this == Discard) {
- return new ThreadPoolExecutor.DiscardPolicy();
+ return new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ if (r instanceof Rejectable) {
+ ((Rejectable) r).reject();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Discard";
+ }
+ };
}
throw new IllegalArgumentException("Unknown ThreadPoolRejectedPolicy: " + this);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java Mon Jan 23 12:37:51 2012
@@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.spi.ThreadPoolFactory;
import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
/**
@@ -82,7 +84,7 @@ public class DefaultThreadPoolFactory im
workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
- ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+ ThreadPoolExecutor answer = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
answer.setThreadFactory(threadFactory);
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
@@ -98,7 +100,7 @@ public class DefaultThreadPoolFactory im
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
- ScheduledThreadPoolExecutor answer = new ScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+ ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
// TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
// need to wrap the thread pool in a sized to guard against the problem that the
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Mon Jan 23 12:37:51 2012
@@ -19,6 +19,7 @@ package org.apache.camel.model;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlAccessType;
@@ -102,6 +103,7 @@ public class ThreadsDefinition extends O
} else {
thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
}
+ thread.setRejectedPolicy(getRejectedPolicy());
List<Processor> pipe = new ArrayList<Processor>(2);
pipe.add(thread);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Mon Jan 23 12:37:51 2012
@@ -24,23 +24,41 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Rejectable;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Threads processor that leverage a thread pool for continue processing the {@link Exchange}s
* using the asynchronous routing engine.
- *
- * @version
+ * <p/>
+ * Pay attention to how this processor handles rejected tasks.
+ * <ul>
+ * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception,
+ * and marked to stop continue routing.
+ * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li>
+ * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set).
+ * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li>
+ * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set).
+ * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.
+ * And the current exchange will be added to the task queue.</li>
+ * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread
+ * will not be free to process a new exchange, as its processing the current exchange.</li>
+ * </ul>
*/
public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class);
private final ExecutorService executorService;
private final AtomicBoolean shutdown = new AtomicBoolean(true);
private boolean callerRunsWhenRejected = true;
+ private ThreadPoolRejectedPolicy rejectedPolicy;
- private final class ProcessCall implements Runnable {
+ private final class ProcessCall implements Runnable, Rejectable {
private final Exchange exchange;
private final AsyncCallback callback;
@@ -49,12 +67,38 @@ public class ThreadsProcessor extends Se
this.callback = callback;
}
+ @Override
public void run() {
+ LOG.trace("Continue routing exchange {} ", exchange);
if (shutdown.get()) {
exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
}
callback.done(false);
}
+
+ @Override
+ public void reject() {
+ // abort should mark the exchange with an rejected exception
+ boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy;
+ if (abort) {
+ exchange.setException(new RejectedExecutionException());
+ }
+
+ LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
+ // we should not continue routing, and no redelivery should be performed
+ exchange.setProperty(Exchange.ROUTE_STOP, true);
+ exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true);
+
+ if (shutdown.get()) {
+ exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
+ }
+ callback.done(false);
+ }
+
+ @Override
+ public String toString() {
+ return "ProcessCall[" + exchange + "]";
+ }
}
public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService) {
@@ -74,19 +118,21 @@ public class ThreadsProcessor extends Se
ProcessCall call = new ProcessCall(exchange, callback);
try {
+ LOG.trace("Submitting task {}", call);
executorService.submit(call);
// tell Camel routing engine we continue routing asynchronous
return false;
} catch (RejectedExecutionException e) {
- if (isCallerRunsWhenRejected()) {
- if (shutdown.get()) {
- exchange.setException(new RejectedExecutionException());
- } else {
- callback.done(true);
- }
- } else {
+ boolean callerRuns = isCallerRunsWhenRejected();
+ if (!callerRuns) {
exchange.setException(e);
}
+
+ LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call);
+ if (shutdown.get()) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(true);
return true;
}
}
@@ -99,6 +145,14 @@ public class ThreadsProcessor extends Se
this.callerRunsWhenRejected = callerRunsWhenRejected;
}
+ public ThreadPoolRejectedPolicy getRejectedPolicy() {
+ return rejectedPolicy;
+ }
+
+ public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+ this.rejectedPolicy = rejectedPolicy;
+ }
+
public String toString() {
return "Threads";
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableFutureTask.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import org.apache.camel.Rejectable;
+
+/**
+ * A {@link Rejectable} {@link FutureTask} used by {@link RejectableThreadPoolExecutor}.
+ *
+ * @see RejectableThreadPoolExecutor
+ */
+public class RejectableFutureTask<V> extends FutureTask<V> implements Rejectable {
+
+ private final Rejectable rejectable;
+
+ public RejectableFutureTask(Callable<V> callable) {
+ super(callable);
+ this.rejectable = callable instanceof Rejectable ? (Rejectable) callable : null;
+ }
+
+ public RejectableFutureTask(Runnable runnable, V result) {
+ super(runnable, result);
+ this.rejectable = runnable instanceof Rejectable ? (Rejectable) runnable : null;
+ }
+
+ @Override
+ public void reject() {
+ if (rejectable != null) {
+ rejectable.reject();
+ }
+ }
+
+}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Scheduled thread pool executor that creates {@link RejectableFutureTask} instead of
+ * {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
+ * <p/>
+ * Instances of {@link RejectableFutureTask} are required to handle {@link org.apache.camel.ThreadPoolRejectedPolicy#Discard}
+ * and {@link org.apache.camel.ThreadPoolRejectedPolicy#DiscardOldest} policies correctly, e.g. notify
+ * {@link Callable} and {@link Runnable} tasks when they are rejected.
+ * To be notified of rejection tasks have to implement {@link org.apache.camel.Rejectable} interface: <br/>
+ * <code><pre>
+ * public class RejectableTask implements Runnable, Rejectable {
+ * @Override
+ * public void run() {
+ * // execute task
+ * }
+ * @Override
+ * public void reject() {
+ * // do something useful on rejection
+ * }
+ * }
+ * </pre></code>
+ * <p/>
+ * If the task does not implement {@link org.apache.camel.Rejectable} interface the behavior is exactly the same as with
+ * ordinary {@link ScheduledThreadPoolExecutor}.
+ *
+ * @see RejectableFutureTask
+ * @see org.apache.camel.Rejectable
+ * @see RejectableThreadPoolExecutor
+ */
+public class RejectableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
+
+ public RejectableScheduledThreadPoolExecutor(int corePoolSize) {
+ super(corePoolSize);
+ }
+
+ public RejectableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
+ super(corePoolSize, threadFactory);
+ }
+
+ public RejectableScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
+ super(corePoolSize, handler);
+ }
+
+ public RejectableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
+ super(corePoolSize, threadFactory, handler);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ return new RejectableFutureTask<T>(runnable, value);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return new RejectableFutureTask<T>(callable);
+ }
+
+}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java?rev=1234765&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java Mon Jan 23 12:37:51 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thread pool executor that creates {@link RejectableFutureTask} instead of
+ * {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
+ * <p/>
+ * Instances of {@link RejectableFutureTask} are required to handle {@link org.apache.camel.ThreadPoolRejectedPolicy#Discard}
+ * and {@link org.apache.camel.ThreadPoolRejectedPolicy#DiscardOldest} policies correctly, e.g. notify
+ * {@link Callable} and {@link Runnable} tasks when they are rejected.
+ * To be notified of rejection tasks have to implement {@link org.apache.camel.Rejectable} interface: <br/>
+ * <code><pre>
+ * public class RejectableTask implements Runnable, Rejectable {
+ * @Override
+ * public void run() {
+ * // execute task
+ * }
+ * @Override
+ * public void reject() {
+ * // do something useful on rejection
+ * }
+ * }
+ * </pre></code>
+ * <p/>
+ * If the task does not implement {@link org.apache.camel.Rejectable} interface the behavior is exactly the same as with
+ * ordinary {@link ThreadPoolExecutor}.
+ *
+ * @see RejectableFutureTask
+ * @see org.apache.camel.Rejectable
+ * @see RejectableScheduledThreadPoolExecutor
+ */
+public class RejectableThreadPoolExecutor extends ThreadPoolExecutor {
+
+ public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+ }
+
+ public RejectableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ return new RejectableFutureTask<T>(runnable, value);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return new RejectableFutureTask<T>(callable);
+ }
+
+}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Mon Jan 23 12:37:51 2012
@@ -237,7 +237,7 @@ public class DefaultExecutorServiceManag
// should inherit the default values
assertEquals(10, tp.getCorePoolSize());
assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
}
public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception {
@@ -263,7 +263,7 @@ public class DefaultExecutorServiceManag
// should inherit the default values
assertEquals(1, tp.getCorePoolSize());
assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.AbortPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("Abort", tp.getRejectedExecutionHandler().toString());
}
public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception {
@@ -285,7 +285,7 @@ public class DefaultExecutorServiceManag
// should inherit the default values
assertEquals(50, tp.getMaximumPoolSize());
assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
}
public void testNewThreadPoolProfile() throws Exception {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceStrategyTest.java Mon Jan 23 12:37:51 2012
@@ -239,7 +239,7 @@ public class DefaultExecutorServiceStrat
// should inherit the default values
assertEquals(10, tp.getCorePoolSize());
assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
}
public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception {
@@ -265,7 +265,7 @@ public class DefaultExecutorServiceStrat
// should inherit the default values
assertEquals(1, tp.getCorePoolSize());
assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.AbortPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("Abort", tp.getRejectedExecutionHandler().toString());
}
public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception {
@@ -287,7 +287,7 @@ public class DefaultExecutorServiceStrat
// should inherit the default values
assertEquals(50, tp.getMaximumPoolSize());
assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
}
public void testNewThreadPoolProfile() throws Exception {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java Mon Jan 23 12:37:51 2012
@@ -22,6 +22,8 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -43,8 +45,6 @@ public class ThreadsRejectedExecutionTes
// this should force the ThreadsProcessor to run the tasks itself
ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
- context.setTracing(true);
-
from("seda:start")
.to("log:before")
// will use our custom pool
@@ -73,8 +73,6 @@ public class ThreadsRejectedExecutionTes
// this should force the ThreadsProcessor to run the tasks itself
ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
- context.setTracing(true);
-
from("seda:start")
.to("log:before")
// will use our custom pool
@@ -102,4 +100,150 @@ public class ThreadsRejectedExecutionTes
assertEquals(1, mock.getReceivedCounter());
}
+ public void testThreadsRejectedDiscard() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start")
+ .to("log:before")
+ .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.Discard)
+ .delay(1000)
+ .to("log:after")
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:start", "Message " + i);
+ }
+ assertMockEndpointsSatisfied();
+
+ assertTrue(notify.matchesMockWaitTime());
+
+ int inflight = context.getInflightRepository().size();
+ assertEquals(0, inflight);
+ }
+
+ public void testThreadsRejectedDiscardOldest() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start")
+ .to("log:before")
+ .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
+ .delay(1000)
+ .to("log:after")
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:start", "Message " + i);
+ }
+ assertMockEndpointsSatisfied();
+
+ assertTrue(notify.matchesMockWaitTime());
+
+ int inflight = context.getInflightRepository().size();
+ assertEquals(0, inflight);
+ }
+
+ public void testThreadsRejectedAbort() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start")
+ .to("log:before")
+ .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+ .delay(1000)
+ .to("log:after")
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:start", "Message " + i);
+ }
+ assertMockEndpointsSatisfied();
+
+ assertTrue(notify.matchesMockWaitTime());
+
+ int inflight = context.getInflightRepository().size();
+ assertEquals(0, inflight);
+ }
+
+ public void testThreadsRejectedCallerRuns() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start")
+ .to("log:before")
+ .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns)
+ .delay(200)
+ .to("log:after")
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+ getMockEndpoint("mock:result").expectedMessageCount(10);
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:start", "Message " + i);
+ }
+ assertMockEndpointsSatisfied();
+
+ assertTrue(notify.matchesMockWaitTime());
+
+ int inflight = context.getInflightRepository().size();
+ assertEquals(0, inflight);
+ }
+
+ public void testThreadsRejectedAbortNoRedelivery() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class).maximumRedeliveries(3).handled(true).to("mock:error");
+
+ from("seda:start")
+ .to("log:before")
+ .threads(1, 1).maxPoolSize(1).maxQueueSize(2).rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+ .delay(1000)
+ .to("log:after")
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create();
+
+ // there should be error handling for aborted tasks (eg no redeliveries and no error handling)
+ getMockEndpoint("mock:error").expectedMessageCount(0);
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:start", "Message " + i);
+ }
+ assertMockEndpointsSatisfied();
+
+ assertTrue(notify.matchesMockWaitTime());
+
+ int inflight = context.getInflightRepository().size();
+ assertEquals(0, inflight);
+ }
+
+
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherRefTest.java Mon Jan 23 12:37:51 2012
@@ -65,7 +65,7 @@ public class PollEnricherRefTest extends
public void configure() throws Exception {
cool.setEndpointUriIfNotSpecified("cool");
- from("direct:start").pollEnrichRef("cool", 1000, "agg");
+ from("direct:start").pollEnrichRef("cool", 2000, "agg");
}
};
}
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java?rev=1234765&r1=1234764&r2=1234765&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/SpringCamelContextThreadPoolProfilesTest.java Mon Jan 23 12:37:51 2012
@@ -50,7 +50,7 @@ public class SpringCamelContextThreadPoo
assertEquals(5, tp.getMaximumPoolSize());
// should inherit default options
assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString());
}
public void testBigProfile() throws Exception {
@@ -70,7 +70,7 @@ public class SpringCamelContextThreadPoo
assertEquals(100, tp.getMaximumPoolSize());
// should inherit default options
assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS));
- assertIsInstanceOf(ThreadPoolExecutor.DiscardOldestPolicy.class, tp.getRejectedExecutionHandler());
+ assertEquals("DiscardOldest", tp.getRejectedExecutionHandler().toString());
}
}
\ No newline at end of file