You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2008/05/20 17:12:01 UTC
svn commit: r658273 -
/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
Author: chirino
Date: Tue May 20 08:12:01 2008
New Revision: 658273
URL: http://svn.apache.org/viewvc?rev=658273&view=rev
Log:
Fix for http://www.nabble.com/I-think-I-should-be-able-to-refer-to-an-endpoint-by-name-w-o-knowing-the-URL-parameters-td15994537s22882.html#a17284142
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=658273&r1=658272&r2=658273&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java Tue May 20 08:12:01 2008
@@ -16,11 +16,9 @@
*/
package org.apache.camel.processor;
-import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -50,6 +48,7 @@
private int maxSize = 1;
private int coreSize = 1;
private final AtomicBoolean shutdown = new AtomicBoolean(true);
+ private boolean callerRunsWhenRejected = true;
class ProcessCall implements Runnable {
private final Exchange exchange;
@@ -63,10 +62,8 @@
public void run() {
if (shutdown.get()) {
exchange.setException(new RejectedExecutionException());
- callback.done(false);
- } else {
- callback.done(false);
}
+ callback.done(false);
}
}
@@ -79,19 +76,26 @@
throw new IllegalStateException("ThreadProcessor is not running.");
}
ProcessCall call = new ProcessCall(exchange, callback);
- executor.execute(call);
- return false;
+ try {
+ executor.execute(call);
+ return false;
+ } catch ( RejectedExecutionException e ) {
+ if( callerRunsWhenRejected ) {
+ if (shutdown.get()) {
+ exchange.setException(new RejectedExecutionException());
+ } else {
+ callback.done(true);
+ }
+ } else {
+ exchange.setException(e);
+ }
+ return true;
+ }
}
public void start() throws Exception {
shutdown.set(false);
- getExecutor().setRejectedExecutionHandler(new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
- ProcessCall call = (ProcessCall)runnable;
- call.exchange.setException(new RejectedExecutionException());
- call.callback.done(false);
- }
- });
+ getExecutor();
}
public void stop() throws Exception {
@@ -198,4 +202,12 @@
this.executor = executor;
}
+ public boolean isCallerRunsWhenRejected() {
+ return callerRunsWhenRejected;
+ }
+
+ public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
+ this.callerRunsWhenRejected = callerRunsWhenRejected;
+ }
+
}