You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/12 11:54:03 UTC
camel git commit: CAMEL-9791: Threads EIP should use the rejection
handler if configured. And only use caller runs as fallback.
Repository: camel
Updated Branches:
refs/heads/master fd4378ede -> 48f76064e
CAMEL-9791: Threads EIP should use the rejection handler if configured. And only use caller runs as fallback.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48f76064
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48f76064
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48f76064
Branch: refs/heads/master
Commit: 48f76064ef65ffd828940efc5fc078c638350085
Parents: fd4378e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 12 11:40:48 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 12 11:53:54 2016 +0200
----------------------------------------------------------------------
.../camel/management/mbean/ManagedThreads.java | 11 ++-
.../apache/camel/model/ThreadsDefinition.java | 30 +++++---
.../camel/processor/ThreadsProcessor.java | 72 +++++++++-----------
3 files changed, 59 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/48f76064/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
index 408dbcb..fe78b10 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
@@ -40,13 +40,18 @@ public class ManagedThreads extends ManagedProcessor implements ManagedThreadsMB
@Override
public Boolean isCallerRunsWhenRejected() {
- return processor.isCallerRunsWhenRejected();
+ if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
+ String name = getRejectedPolicy();
+ return "CallerRuns".equals(name);
+ } else {
+ return null;
+ }
}
@Override
public String getRejectedPolicy() {
- if (processor.getRejectedPolicy() != null) {
- return processor.getRejectedPolicy().name();
+ if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor) processor.getExecutorService()).getRejectedExecutionHandler().toString();
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/48f76064/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
index 065be28..a287659 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
@@ -85,6 +85,19 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
// prefer any explicit configured executor service
boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
+
+ // resolve what rejected policy to use
+ ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext);
+ if (policy == null) {
+ if (callerRunsWhenRejected == null || callerRunsWhenRejected) {
+ // should use caller runs by default if not configured
+ policy = ThreadPoolRejectedPolicy.CallerRuns;
+ } else {
+ policy = ThreadPoolRejectedPolicy.Abort;
+ }
+ }
+ log.debug("Using ThreadPoolRejectedPolicy: {}", policy);
+
// if no explicit then create from the options
if (threadPool == null) {
ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
@@ -94,7 +107,7 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
.maxPoolSize(getMaxPoolSize())
.keepAliveTime(getKeepAliveTime(), getTimeUnit())
.maxQueueSize(getMaxQueueSize())
- .rejectedPolicy(getRejectedPolicy())
+ .rejectedPolicy(policy)
.allowCoreThreadTimeOut(getAllowCoreThreadTimeOut())
.build();
threadPool = manager.newThreadPool(this, name, profile);
@@ -112,6 +125,9 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
if (getKeepAliveTime() != null) {
throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together.");
}
+ if (getTimeUnit() != null) {
+ throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together.");
+ }
if (getMaxQueueSize() != null) {
throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together.");
}
@@ -123,14 +139,7 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
}
}
- ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool);
- if (getCallerRunsWhenRejected() == null) {
- // should be true by default
- thread.setCallerRunsWhenRejected(true);
- } else {
- thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
- }
- thread.setRejectedPolicy(resolveRejectedPolicy(routeContext));
+ ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy);
List<Processor> pipe = new ArrayList<Processor>(2);
pipe.add(thread);
@@ -262,7 +271,8 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
}
/**
- * Whether or not the caller should run the task when it was rejected by the thread pool.
+ * Whether or not to use as caller runs as <b>fallback</b> when a task is rejected being added to the thread pool (when its full).
+ * This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler.
* <p/>
* Is by default <tt>true</tt>
*
http://git-wip-us.apache.org/repos/asf/camel/blob/48f76064/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index 2f97943..b3c6dc4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -18,6 +18,8 @@ package org.apache.camel.processor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
@@ -62,42 +64,39 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
private String id;
private final CamelContext camelContext;
private final ExecutorService executorService;
+ private final ThreadPoolRejectedPolicy rejectedPolicy;
private volatile boolean shutdownExecutorService;
private final AtomicBoolean shutdown = new AtomicBoolean(true);
- private boolean callerRunsWhenRejected = true;
- private ThreadPoolRejectedPolicy rejectedPolicy;
private final class ProcessCall implements Runnable, Rejectable {
private final Exchange exchange;
private final AsyncCallback callback;
+ private final boolean done;
- ProcessCall(Exchange exchange, AsyncCallback callback) {
+ ProcessCall(Exchange exchange, AsyncCallback callback, boolean done) {
this.exchange = exchange;
this.callback = callback;
+ this.done = done;
}
@Override
public void run() {
- LOG.trace("Continue routing exchange {} ", exchange);
+ LOG.trace("Continue routing exchange {}", exchange);
if (shutdown.get()) {
exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
}
- callback.done(false);
+ callback.done(done);
}
@Override
public void reject() {
- // abort should mark the exchange with an rejected exception
- boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy;
- if (abort) {
- exchange.setException(new RejectedExecutionException());
- }
- LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
-
+ // reject should mark the exchange with an rejected exception and mark not to route anymore
+ exchange.setException(new RejectedExecutionException());
+ LOG.trace("Rejected routing exchange {}", exchange);
if (shutdown.get()) {
exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
}
- callback.done(false);
+ callback.done(done);
}
@Override
@@ -106,12 +105,14 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
}
}
- public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) {
+ public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService, ThreadPoolRejectedPolicy rejectedPolicy) {
ObjectHelper.notNull(camelContext, "camelContext");
ObjectHelper.notNull(executorService, "executorService");
+ ObjectHelper.notNull(rejectedPolicy, "rejectedPolicy");
this.camelContext = camelContext;
this.executorService = executorService;
this.shutdownExecutorService = shutdownExecutorService;
+ this.rejectedPolicy =rejectedPolicy;
}
public void process(final Exchange exchange) throws Exception {
@@ -131,43 +132,28 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
return true;
}
- ProcessCall call = new ProcessCall(exchange, callback);
try {
+ // process the call in asynchronous mode
+ ProcessCall call = new ProcessCall(exchange, callback, false);
LOG.trace("Submitting task {}", call);
executorService.submit(call);
// tell Camel routing engine we continue routing asynchronous
return false;
- } catch (RejectedExecutionException e) {
- boolean callerRuns = isCallerRunsWhenRejected();
- if (!callerRuns) {
+ } catch (Throwable e) {
+ if (executorService instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
+ // process the call in synchronous mode
+ ProcessCall call = new ProcessCall(exchange, callback, true);
+ rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe);
+ return true;
+ } else {
exchange.setException(e);
+ callback.done(true);
+ return true;
}
-
- LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call);
- if (shutdown.get()) {
- exchange.setException(new RejectedExecutionException());
- }
- callback.done(true);
- return true;
}
}
- public boolean isCallerRunsWhenRejected() {
- return callerRunsWhenRejected;
- }
-
- public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
- this.callerRunsWhenRejected = callerRunsWhenRejected;
- }
-
- public ThreadPoolRejectedPolicy getRejectedPolicy() {
- return rejectedPolicy;
- }
-
- public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
- this.rejectedPolicy = rejectedPolicy;
- }
-
public ExecutorService getExecutorService() {
return executorService;
}
@@ -184,6 +170,10 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
this.id = id;
}
+ public ThreadPoolRejectedPolicy getRejectedPolicy() {
+ return rejectedPolicy;
+ }
+
protected void doStart() throws Exception {
shutdown.set(false);
}