You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/25 16:44:04 UTC

svn commit: r957996 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Fri Jun 25 14:44:03 2010
New Revision: 957996

URL: http://svn.apache.org/viewvc?rev=957996&view=rev
Log:
CAMEL-2859: Threads DSL now supports async routing engine. Non backward comp. change as waitForTask option removed.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
      - copied, changed from r957944, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=957996&r1=957995&r2=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Jun 25 14:44:03 2010
@@ -22,6 +22,8 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -29,6 +31,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
@@ -45,14 +48,14 @@ public class SedaConsumer extends Servic
     private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
 
     private SedaEndpoint endpoint;
-    private Processor processor;
+    private AsyncProcessor processor;
     private ExecutorService executor;
-    private Processor multicast;
+    private MulticastProcessor multicast;
     private ExceptionHandler exceptionHandler;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
-        this.processor = processor;
+        this.processor = AsyncProcessorTypeConverter.convert(processor);
     }
 
     @Override
@@ -153,15 +156,25 @@ public class SedaConsumer extends Servic
             }
 
             // use a multicast processor to process it
-            Processor mp = getMulticastProcessor();
-            mp.process(exchange);
+            MulticastProcessor mp = getMulticastProcessor();
+
+            // and use the asynchronous routing engine to support it
+            mp.process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // noop
+                }
+            });
         } else {
-            // use the regular processor
-            processor.process(exchange);
+            // use the regular processor and use the asynchronous routing engine to support it
+            processor.process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // noop
+                }
+            });
         }
     }
 
-    protected synchronized Processor getMulticastProcessor() {
+    protected synchronized MulticastProcessor getMulticastProcessor() {
         if (multicast == null) {
             int size = endpoint.getConsumers().size();
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=957996&r1=957995&r2=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Jun 25 14:44:03 2010
@@ -65,6 +65,8 @@ public class ThreadsDefinition extends O
     private String threadName;
     @XmlAttribute
     private ThreadPoolRejectedPolicy rejectedPolicy;
+    @XmlAttribute
+    private Boolean callerRunsWhenRejected = Boolean.TRUE;
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
@@ -97,11 +99,13 @@ public class ThreadsDefinition extends O
         }
 
         ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);
-        Processor childProcessor = createChildProcessor(routeContext, true);
+        if (getCallerRunsWhenRejected() != null) {
+            thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
+        }
 
         List<Processor> pipe = new ArrayList<Processor>(2);
         pipe.add(thread);
-        pipe.add(childProcessor);
+        pipe.add(createChildProcessor(routeContext, true));
         // wrap in nested pipeline so this appears as one processor
         return new Pipeline(routeContext.getCamelContext(), pipe);
     }
@@ -211,6 +215,19 @@ public class ThreadsDefinition extends O
         return this;
     }
 
+    /**
+     * Whether or not the caller should run the task when it was rejected by the thread pool.
+     * <p/>
+     * Is by default <tt>true</tt>
+     *
+     * @param callerRunsWhenRejected whether or not the caller should run
+     * @return the builder
+     */
+    public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
+        setCallerRunsWhenRejected(callerRunsWhenRejected);
+        return this;
+    }
+
     public ExecutorService getExecutorService() {
         return executorService;
     }
@@ -282,4 +299,12 @@ public class ThreadsDefinition extends O
     public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
         this.rejectedPolicy = rejectedPolicy;
     }
+
+    public Boolean getCallerRunsWhenRejected() {
+        return callerRunsWhenRejected;
+    }
+
+    public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
+        this.callerRunsWhenRejected = callerRunsWhenRejected;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=957996&r1=957995&r2=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Jun 25 14:44:03 2010
@@ -44,6 +44,7 @@ public class ThreadsProcessor extends Se
     private final CamelContext camelContext;
     private final ExecutorService executorService;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
+    private boolean callerRunsWhenRejected = true;
 
     private final class ProcessCall implements Runnable {
         private final Exchange exchange;
@@ -67,8 +68,6 @@ public class ThreadsProcessor extends Se
         ObjectHelper.notNull(executorService, "executorService");
         this.camelContext = camelContext;
         this.executorService = executorService;
-        // TODO: if rejection policy of executor service is caller runs then we need to tap into it
-        // so we can invoke the callback.done(true) to continue routing synchronously
     }
 
     public void process(final Exchange exchange) throws Exception {
@@ -85,13 +84,25 @@ public class ThreadsProcessor extends Se
             executorService.submit(call);
             return false;
         } catch (RejectedExecutionException e) {
-            if (shutdown.get()) {
-                exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.", e));
+            if (isCallerRunsWhenRejected()) {
+                if (shutdown.get()) {
+                    exchange.setException(new RejectedExecutionException());
+                } else {
+                    callback.done(true);
+                }
             } else {
                 exchange.setException(e);
             }
+            return true;
         }
-        return true;
+    }
+
+    public boolean isCallerRunsWhenRejected() {
+        return callerRunsWhenRejected;
+    }
+
+    public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
+        this.callerRunsWhenRejected = callerRunsWhenRejected;
     }
 
     public String toString() {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java (from r957944, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java&r1=957944&r2=957996&rev=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java Fri Jun 25 14:44:03 2010
@@ -16,68 +16,90 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision$
  */
-public class ThreadsCorePoolTest extends ContextTestSupport {
-
-    private static String beforeThreadName;
-    private static String afterThreadName;
+public class ThreadsRejectedExecutionTest extends ContextTestSupport {
 
-    public void testThreadsCorePool() throws Exception {
-        getMockEndpoint("mock:result").expectedMessageCount(1);
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
 
-        template.sendBody("direct:start", "Hello World");
+    public void testThreadsRejectedExecution() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // use a custom pool which rejects any new tasks while currently in progress
+                // this should force the ThreadsProcessor to run the tasks itself
+                ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
 
-        assertMockEndpointsSatisfied();
+                context.setTracing(true);
 
-        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
-    }
+                from("seda:start")
+                    .to("log:before")
+                    // will use our custom pool
+                    .threads().executorService(pool)
+                    .delay(1000)
+                    .to("log:after")
+                    .to("mock:result");
+            }
+        });
+        context.start();
 
-    public void testThreadsCorePoolBuilder() throws Exception {
-        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(3);
 
-        template.sendBody("direct:foo", "Hello World");
+        template.sendBody("seda:start", "Hello World");
+        template.sendBody("seda:start", "Hi World");
+        template.sendBody("seda:start", "Bye World");
 
         assertMockEndpointsSatisfied();
-
-        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
+    public void testThreadsRejectedExecutionCallerNotRuns() throws Exception {
+        context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                // use a custom pool which rejects any new tasks while currently in progress
+                // this should force the ThreadsProcessor to run the tasks itself
+                ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+
                 context.setTracing(true);
 
-                from("direct:start")
+                from("seda:start")
                     .to("log:before")
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            beforeThreadName = Thread.currentThread().getName();
-                        }
-                    })
-                    // will use a a custom thread pool with 5 in core and 5 as max
-                    .threads(5)
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            afterThreadName = Thread.currentThread().getName();
-                        }
-                    })
+                    // will use our custom pool
+                    .threads().executorService(pool).callerRunsWhenRejected(false)
+                    .delay(1000)
                     .to("log:after")
                     .to("mock:result");
-
-                from("direct:foo")
-                    // using the builder style
-                    .threads().poolSize(5)
-                    .to("mock:result");
             }
-        };
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+        // wait at most 5 seconds
+        mock.setResultWaitTime(5000);
+
+        template.sendBody("seda:start", "Hello World");
+        template.sendBody("seda:start", "Hi World");
+        template.sendBody("seda:start", "Bye World");
+
+        // should not be possible to route all 3
+        mock.assertIsNotSatisfied();
+
+        // only 1 should arrive
+        assertEquals(1, mock.getReceivedCounter());
     }
+
 }
\ No newline at end of file