You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/17 08:27:10 UTC
svn commit: r924180 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel:
impl/DefaultEndpoint.java impl/ScheduledPollConsumer.java
processor/SendAsyncProcessor.java
Author: davsclaus
Date: Wed Mar 17 07:27:10 2010
New Revision: 924180
URL: http://svn.apache.org/viewvc?rev=924180&view=rev
Log:
CAMEL-1588: Scheduler do only need to use 1 thread in the pool.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=924180&r1=924179&r2=924180&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Wed Mar 17 07:27:10 2010
@@ -19,8 +19,6 @@ package org.apache.camel.impl;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
@@ -37,12 +35,9 @@ import org.apache.camel.util.ObjectHelpe
* @version $Revision$
*/
public abstract class DefaultEndpoint implements Endpoint, CamelContextAware {
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
-
private String endpointUri;
private CamelContext camelContext;
private Component component;
- private ExecutorService executorService;
private ExchangePattern exchangePattern = ExchangePattern.InOnly;
protected DefaultEndpoint(String endpointUri, Component component) {
@@ -119,25 +114,6 @@ public abstract class DefaultEndpoint im
this.camelContext = camelContext;
}
- /**
- * @deprecated will be removed in Camel 2.4
- */
- @Deprecated
- public synchronized ExecutorService getExecutorService() {
- if (executorService == null) {
- executorService = createScheduledExecutorService();
- }
- return executorService;
- }
-
- /**
- * @deprecated will be removed in Camel 2.4
- */
- @Deprecated
- public synchronized void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
-
public PollingConsumer createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer(this);
}
@@ -187,14 +163,6 @@ public abstract class DefaultEndpoint im
this.exchangePattern = exchangePattern;
}
- /**
- * @deprecated will be removed in Camel 2.4
- */
- @Deprecated
- protected ScheduledExecutorService createScheduledExecutorService() {
- return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
- }
-
public void configureProperties(Map<String, Object> options) {
// do nothing by default
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=924180&r1=924179&r2=924180&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Wed Mar 17 07:27:10 2010
@@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFac
* @version $Revision$
*/
public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService {
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
private static final transient Log LOG = LogFactory.getLog(ScheduledPollConsumer.class);
private final ScheduledExecutorService executor;
@@ -51,9 +50,9 @@ public abstract class ScheduledPollConsu
public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- // TODO: this executor should also be shutdown when CamelContext stops
+ // we only need one thread in the pool to schedule this task
this.executor = endpoint.getCamelContext().getExecutorServiceStrategy()
- .newScheduledThreadPool(this, endpoint.getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
+ .newScheduledThreadPool(this, endpoint.getEndpointUri(), 1);
ObjectHelper.notNull(executor, "executor");
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=924180&r1=924179&r2=924180&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Wed Mar 17 07:27:10 2010
@@ -42,17 +42,14 @@ import org.apache.camel.util.ExchangeHel
* @version $Revision$
*/
public class SendAsyncProcessor extends SendProcessor implements Runnable, Navigate<Processor> {
-
- private static final int DEFAULT_THREADPOOL_SIZE = 10;
private final CamelContext camelContext;
private final Processor target;
private final BlockingQueue<Exchange> completedTasks = new LinkedBlockingQueue<Exchange>();
private ExecutorService executorService;
private ExecutorService producerExecutorService;
- private int poolSize = DEFAULT_THREADPOOL_SIZE;
+ private int poolSize = 10;
private ExceptionHandler exceptionHandler;
-
public SendAsyncProcessor(Endpoint destination, Processor target) {
super(destination);
this.target = target;
@@ -168,10 +165,6 @@ public class SendAsyncProcessor extends
}
public ExecutorService getExecutorService() {
- if (executorService == null) {
- executorService = destination.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool(this, "SendAsyncProcessor-Consumer", poolSize, poolSize);
- }
return executorService;
}
@@ -184,9 +177,9 @@ public class SendAsyncProcessor extends
this.executorService = executorService;
}
- public ExecutorService getProducerExecutorService() {
+ public synchronized ExecutorService getProducerExecutorService() {
if (producerExecutorService == null) {
- // use a cached pool for the producers which can grow/schrink itself
+ // use a default pool for the producers which can grow/schrink itself
producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
.newDefaultThreadPool(this, "SendAsyncProcessor-Producer");
}
@@ -262,7 +255,8 @@ public class SendAsyncProcessor extends
LOG.debug("Async reply received now routing the Exchange: " + exchange);
}
target.process(exchange);
- } catch (Exception e) {
+ } catch (Throwable e) {
+ // must catch throwable to avoid existing this method and thus the thread terminates
getExceptionHandler().handleException(e);
}
}
@@ -272,23 +266,33 @@ public class SendAsyncProcessor extends
protected void doStart() throws Exception {
super.doStart();
+ if (poolSize <= 0) {
+ throw new IllegalArgumentException("PoolSize must be a positive number");
+ }
+
for (int i = 0; i < poolSize; i++) {
- getExecutorService().execute(this);
+ if (executorService == null) {
+ executorService = destination.getCamelContext().getExecutorServiceStrategy()
+ .newFixedThreadPool(this, "SendAsyncProcessor-Consumer", poolSize);
+ }
+ executorService.execute(this);
}
}
protected void doStop() throws Exception {
super.doStop();
- // must shutdown thread pools on stop as we are consumers
- if (producerExecutorService != null) {
- camelContext.getExecutorServiceStrategy().shutdownNow(producerExecutorService);
- producerExecutorService = null;
- }
+ // must shutdown executor service as its used for concurrent consumers
if (executorService != null) {
camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
executorService = null;
}
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ // clear the completed tasks when we shutdown
completedTasks.clear();
}