You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/17 08:27:10 UTC

svn commit: r924180 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: impl/DefaultEndpoint.java impl/ScheduledPollConsumer.java processor/SendAsyncProcessor.java

Author: davsclaus
Date: Wed Mar 17 07:27:10 2010
New Revision: 924180

URL: http://svn.apache.org/viewvc?rev=924180&view=rev
Log:
CAMEL-1588: Scheduler do only need to use 1 thread in the pool.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=924180&r1=924179&r2=924180&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Wed Mar 17 07:27:10 2010
@@ -19,8 +19,6 @@ package org.apache.camel.impl;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -37,12 +35,9 @@ import org.apache.camel.util.ObjectHelpe
  * @version $Revision$
  */
 public abstract class DefaultEndpoint implements Endpoint, CamelContextAware {
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
-
     private String endpointUri;
     private CamelContext camelContext;
     private Component component;
-    private ExecutorService executorService;
     private ExchangePattern exchangePattern = ExchangePattern.InOnly;
 
     protected DefaultEndpoint(String endpointUri, Component component) {
@@ -119,25 +114,6 @@ public abstract class DefaultEndpoint im
         this.camelContext = camelContext;
     }
 
-    /**
-     * @deprecated will be removed in Camel 2.4
-     */
-    @Deprecated
-    public synchronized ExecutorService getExecutorService() {
-        if (executorService == null) {
-            executorService = createScheduledExecutorService();
-        }
-        return executorService;
-    }
-
-    /**
-     * @deprecated will be removed in Camel 2.4
-     */
-    @Deprecated
-    public synchronized void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
-
     public PollingConsumer createPollingConsumer() throws Exception {
         return new EventDrivenPollingConsumer(this);
     }
@@ -187,14 +163,6 @@ public abstract class DefaultEndpoint im
         this.exchangePattern = exchangePattern;
     }
 
-    /**
-     * @deprecated will be removed in Camel 2.4
-     */
-    @Deprecated
-    protected ScheduledExecutorService createScheduledExecutorService() {
-        return getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
-    }
-
     public void configureProperties(Map<String, Object> options) {
         // do nothing by default
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=924180&r1=924179&r2=924180&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Wed Mar 17 07:27:10 2010
@@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFac
  * @version $Revision$
  */
 public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService {
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private static final transient Log LOG = LogFactory.getLog(ScheduledPollConsumer.class);
 
     private final ScheduledExecutorService executor;
@@ -51,9 +50,9 @@ public abstract class ScheduledPollConsu
     public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
 
-        // TODO: this executor should also be shutdown when CamelContext stops
+        // we only need one thread in the pool to schedule this task
         this.executor = endpoint.getCamelContext().getExecutorServiceStrategy()
-                            .newScheduledThreadPool(this, endpoint.getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
+                            .newScheduledThreadPool(this, endpoint.getEndpointUri(), 1);
         ObjectHelper.notNull(executor, "executor");
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=924180&r1=924179&r2=924180&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Wed Mar 17 07:27:10 2010
@@ -42,17 +42,14 @@ import org.apache.camel.util.ExchangeHel
  * @version $Revision$
  */
 public class SendAsyncProcessor extends SendProcessor implements Runnable, Navigate<Processor> {
-
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private final CamelContext camelContext;
     private final Processor target;
     private final BlockingQueue<Exchange> completedTasks = new LinkedBlockingQueue<Exchange>();
     private ExecutorService executorService;
     private ExecutorService producerExecutorService;
-    private int poolSize = DEFAULT_THREADPOOL_SIZE;
+    private int poolSize = 10;
     private ExceptionHandler exceptionHandler;
 
-
     public SendAsyncProcessor(Endpoint destination, Processor target) {
         super(destination);
         this.target = target;
@@ -168,10 +165,6 @@ public class SendAsyncProcessor extends 
     }
 
     public ExecutorService getExecutorService() {
-        if (executorService == null) {
-            executorService = destination.getCamelContext().getExecutorServiceStrategy()
-                                .newThreadPool(this, "SendAsyncProcessor-Consumer", poolSize, poolSize);
-        }
         return executorService;
     }
 
@@ -184,9 +177,9 @@ public class SendAsyncProcessor extends 
         this.executorService = executorService;
     }
 
-    public ExecutorService getProducerExecutorService() {
+    public synchronized ExecutorService getProducerExecutorService() {
         if (producerExecutorService == null) {
-            // use a cached pool for the producers which can grow/schrink itself
+            // use a default pool for the producers which can grow/schrink itself
             producerExecutorService = destination.getCamelContext().getExecutorServiceStrategy()
                                         .newDefaultThreadPool(this, "SendAsyncProcessor-Producer");
         }
@@ -262,7 +255,8 @@ public class SendAsyncProcessor extends 
                         LOG.debug("Async reply received now routing the Exchange: " + exchange);
                     }
                     target.process(exchange);
-                } catch (Exception e) {
+                } catch (Throwable e) {
+                    // must catch throwable to avoid existing this method and thus the thread terminates
                     getExceptionHandler().handleException(e);
                 }
             }
@@ -272,23 +266,33 @@ public class SendAsyncProcessor extends 
     protected void doStart() throws Exception {
         super.doStart();
 
+        if (poolSize <= 0) {
+            throw new IllegalArgumentException("PoolSize must be a positive number");
+        }
+
         for (int i = 0; i < poolSize; i++) {
-            getExecutorService().execute(this);
+            if (executorService == null) {
+                executorService = destination.getCamelContext().getExecutorServiceStrategy()
+                                    .newFixedThreadPool(this, "SendAsyncProcessor-Consumer", poolSize);
+            }
+            executorService.execute(this);
         }
     }
 
     protected void doStop() throws Exception {
         super.doStop();
 
-        // must shutdown thread pools on stop as we are consumers
-        if (producerExecutorService != null) {
-            camelContext.getExecutorServiceStrategy().shutdownNow(producerExecutorService);
-            producerExecutorService = null;
-        }
+        // must shutdown executor service as its used for concurrent consumers
         if (executorService != null) {
             camelContext.getExecutorServiceStrategy().shutdownNow(executorService);
             executorService = null;
         }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        // clear the completed tasks when we shutdown
         completedTasks.clear();
     }