You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2008/05/20 17:12:01 UTC

svn commit: r658273 - /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java

Author: chirino
Date: Tue May 20 08:12:01 2008
New Revision: 658273

URL: http://svn.apache.org/viewvc?rev=658273&view=rev
Log:
Fix for http://www.nabble.com/I-think-I-should-be-able-to-refer-to-an-endpoint-by-name-w-o-knowing-the-URL-parameters-td15994537s22882.html#a17284142

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=658273&r1=658272&r2=658273&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java Tue May 20 08:12:01 2008
@@ -16,11 +16,9 @@
  */
 package org.apache.camel.processor;
 
-import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -50,6 +48,7 @@
     private int maxSize = 1;
     private int coreSize = 1;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
+    private boolean callerRunsWhenRejected = true;
 
     class ProcessCall implements Runnable {
         private final Exchange exchange;
@@ -63,10 +62,8 @@
         public void run() {
             if (shutdown.get()) {
                 exchange.setException(new RejectedExecutionException());
-                callback.done(false);
-            } else {
-                callback.done(false);
             }
+            callback.done(false);
         }
     }
 
@@ -79,19 +76,26 @@
             throw new IllegalStateException("ThreadProcessor is not running.");
         }
         ProcessCall call = new ProcessCall(exchange, callback);
-        executor.execute(call);
-        return false;
+        try {
+            executor.execute(call);
+            return false;
+        } catch ( RejectedExecutionException e ) {
+            if( callerRunsWhenRejected ) {
+                if (shutdown.get()) {
+                    exchange.setException(new RejectedExecutionException());
+                } else {
+                    callback.done(true);
+                }
+            } else {
+                exchange.setException(e);
+            }
+            return true;
+        }
     }
 
     public void start() throws Exception {
         shutdown.set(false);
-        getExecutor().setRejectedExecutionHandler(new RejectedExecutionHandler() {
-            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
-                ProcessCall call = (ProcessCall)runnable;
-                call.exchange.setException(new RejectedExecutionException());
-                call.callback.done(false);
-            }
-        });
+        getExecutor();
     }
 
     public void stop() throws Exception {
@@ -198,4 +202,12 @@
         this.executor = executor;
     }
 
+    public boolean isCallerRunsWhenRejected() {
+        return callerRunsWhenRejected;
+    }
+
+    public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
+        this.callerRunsWhenRejected = callerRunsWhenRejected;
+    }
+
 }