You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/25 16:44:04 UTC
svn commit: r957996 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/seda/
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/
Author: davsclaus
Date: Fri Jun 25 14:44:03 2010
New Revision: 957996
URL: http://svn.apache.org/viewvc?rev=957996&view=rev
Log:
CAMEL-2859: Threads DSL now supports async routing engine. Non backward comp. change as waitForTask option removed.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
- copied, changed from r957944, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=957996&r1=957995&r2=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Jun 25 14:44:03 2010
@@ -22,6 +22,8 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -29,6 +31,7 @@ import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
@@ -45,14 +48,14 @@ public class SedaConsumer extends Servic
private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
private SedaEndpoint endpoint;
- private Processor processor;
+ private AsyncProcessor processor;
private ExecutorService executor;
- private Processor multicast;
+ private MulticastProcessor multicast;
private ExceptionHandler exceptionHandler;
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
- this.processor = processor;
+ this.processor = AsyncProcessorTypeConverter.convert(processor);
}
@Override
@@ -153,15 +156,25 @@ public class SedaConsumer extends Servic
}
// use a multicast processor to process it
- Processor mp = getMulticastProcessor();
- mp.process(exchange);
+ MulticastProcessor mp = getMulticastProcessor();
+
+ // and use the asynchronous routing engine to support it
+ mp.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // noop
+ }
+ });
} else {
- // use the regular processor
- processor.process(exchange);
+ // use the regular processor and use the asynchronous routing engine to support it
+ processor.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // noop
+ }
+ });
}
}
- protected synchronized Processor getMulticastProcessor() {
+ protected synchronized MulticastProcessor getMulticastProcessor() {
if (multicast == null) {
int size = endpoint.getConsumers().size();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=957996&r1=957995&r2=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Jun 25 14:44:03 2010
@@ -65,6 +65,8 @@ public class ThreadsDefinition extends O
private String threadName;
@XmlAttribute
private ThreadPoolRejectedPolicy rejectedPolicy;
+ @XmlAttribute
+ private Boolean callerRunsWhenRejected = Boolean.TRUE;
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
@@ -97,11 +99,13 @@ public class ThreadsDefinition extends O
}
ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), executorService);
- Processor childProcessor = createChildProcessor(routeContext, true);
+ if (getCallerRunsWhenRejected() != null) {
+ thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
+ }
List<Processor> pipe = new ArrayList<Processor>(2);
pipe.add(thread);
- pipe.add(childProcessor);
+ pipe.add(createChildProcessor(routeContext, true));
// wrap in nested pipeline so this appears as one processor
return new Pipeline(routeContext.getCamelContext(), pipe);
}
@@ -211,6 +215,19 @@ public class ThreadsDefinition extends O
return this;
}
+ /**
+ * Whether or not the caller should run the task when it was rejected by the thread pool.
+ * <p/>
+ * Is by default <tt>true</tt>
+ *
+ * @param callerRunsWhenRejected whether or not the caller should run
+ * @return the builder
+ */
+ public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
+ setCallerRunsWhenRejected(callerRunsWhenRejected);
+ return this;
+ }
+
public ExecutorService getExecutorService() {
return executorService;
}
@@ -282,4 +299,12 @@ public class ThreadsDefinition extends O
public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
this.rejectedPolicy = rejectedPolicy;
}
+
+ public Boolean getCallerRunsWhenRejected() {
+ return callerRunsWhenRejected;
+ }
+
+ public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
+ this.callerRunsWhenRejected = callerRunsWhenRejected;
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=957996&r1=957995&r2=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Jun 25 14:44:03 2010
@@ -44,6 +44,7 @@ public class ThreadsProcessor extends Se
private final CamelContext camelContext;
private final ExecutorService executorService;
private final AtomicBoolean shutdown = new AtomicBoolean(true);
+ private boolean callerRunsWhenRejected = true;
private final class ProcessCall implements Runnable {
private final Exchange exchange;
@@ -67,8 +68,6 @@ public class ThreadsProcessor extends Se
ObjectHelper.notNull(executorService, "executorService");
this.camelContext = camelContext;
this.executorService = executorService;
- // TODO: if rejection policy of executor service is caller runs then we need to tap into it
- // so we can invoke the callback.done(true) to continue routing synchronously
}
public void process(final Exchange exchange) throws Exception {
@@ -85,13 +84,25 @@ public class ThreadsProcessor extends Se
executorService.submit(call);
return false;
} catch (RejectedExecutionException e) {
- if (shutdown.get()) {
- exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.", e));
+ if (isCallerRunsWhenRejected()) {
+ if (shutdown.get()) {
+ exchange.setException(new RejectedExecutionException());
+ } else {
+ callback.done(true);
+ }
} else {
exchange.setException(e);
}
+ return true;
}
- return true;
+ }
+
+ public boolean isCallerRunsWhenRejected() {
+ return callerRunsWhenRejected;
+ }
+
+ public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
+ this.callerRunsWhenRejected = callerRunsWhenRejected;
}
public String toString() {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java (from r957944, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java&r1=957944&r2=957996&rev=957996&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCorePoolTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java Fri Jun 25 14:44:03 2010
@@ -16,68 +16,90 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
-public class ThreadsCorePoolTest extends ContextTestSupport {
-
- private static String beforeThreadName;
- private static String afterThreadName;
+public class ThreadsRejectedExecutionTest extends ContextTestSupport {
- public void testThreadsCorePool() throws Exception {
- getMockEndpoint("mock:result").expectedMessageCount(1);
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
- template.sendBody("direct:start", "Hello World");
+ public void testThreadsRejectedExecution() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // use a custom pool which rejects any new tasks while currently in progress
+ // this should force the ThreadsProcessor to run the tasks itself
+ ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
- assertMockEndpointsSatisfied();
+ context.setTracing(true);
- assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
- }
+ from("seda:start")
+ .to("log:before")
+ // will use our custom pool
+ .threads().executorService(pool)
+ .delay(1000)
+ .to("log:after")
+ .to("mock:result");
+ }
+ });
+ context.start();
- public void testThreadsCorePoolBuilder() throws Exception {
- getMockEndpoint("mock:result").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(3);
- template.sendBody("direct:foo", "Hello World");
+ template.sendBody("seda:start", "Hello World");
+ template.sendBody("seda:start", "Hi World");
+ template.sendBody("seda:start", "Bye World");
assertMockEndpointsSatisfied();
-
- assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
+ public void testThreadsRejectedExecutionCallerNotRuns() throws Exception {
+ context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
+ // use a custom pool which rejects any new tasks while currently in progress
+ // this should force the ThreadsProcessor to run the tasks itself
+ ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+
context.setTracing(true);
- from("direct:start")
+ from("seda:start")
.to("log:before")
- .process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- beforeThreadName = Thread.currentThread().getName();
- }
- })
- // will use a a custom thread pool with 5 in core and 5 as max
- .threads(5)
- .process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- afterThreadName = Thread.currentThread().getName();
- }
- })
+ // will use our custom pool
+ .threads().executorService(pool).callerRunsWhenRejected(false)
+ .delay(1000)
.to("log:after")
.to("mock:result");
-
- from("direct:foo")
- // using the builder style
- .threads().poolSize(5)
- .to("mock:result");
}
- };
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+ // wait at most 5 seconds
+ mock.setResultWaitTime(5000);
+
+ template.sendBody("seda:start", "Hello World");
+ template.sendBody("seda:start", "Hi World");
+ template.sendBody("seda:start", "Bye World");
+
+ // should not be possible to route all 3
+ mock.assertIsNotSatisfied();
+
+ // only 1 should arrive
+ assertEquals(1, mock.getReceivedCounter());
}
+
}
\ No newline at end of file