You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/16 19:28:00 UTC

svn commit: r1215240 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/concurrent/ test/java...

Author: davsclaus
Date: Fri Dec 16 18:27:59 2011
New Revision: 1215240

URL: http://svn.apache.org/viewvc?rev=1215240&view=rev
Log:
CAMEL-4786: Add sized scheduled thread pool to ensure scheduled thread pools do not eat up memory as the JDK pools is unbounded. Fixed throttler and delayer to use thread pool profile so end user can customize core pool size with these EIPs.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Fri Dec 16 18:27:59 2011
@@ -42,6 +42,7 @@ import org.apache.camel.support.ServiceS
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.concurrent.CamelThreadFactory;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 import org.apache.camel.util.concurrent.ThreadHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -359,8 +360,13 @@ public class DefaultExecutorServiceManag
         }
 
         // let lifecycle strategy be notified as well which can let it be managed in JMX as well
+        ThreadPoolExecutor threadPool = null;
         if (executorService instanceof ThreadPoolExecutor) {
-            ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
+            threadPool = (ThreadPoolExecutor) executorService;
+        } else if (executorService instanceof SizedScheduledExecutorService) {
+            threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
+        }
+        if (threadPool != null) {
             for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
                 lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultThreadPoolFactory.java Fri Dec 16 18:27:59 2011
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 
 /**
  * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
@@ -92,20 +93,22 @@ public class DefaultThreadPoolFactory im
     
     @Override
     public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
-        ScheduledThreadPoolExecutor answer = new ScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory);
-
-        // need to use setters to set the other values as we cannot use a constructor
-        // keep alive and maximum pool size have no effects on a scheduled thread pool as its
-        // a fixed size pool with an unbounded queue (see class javadoc)
-        // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
-
         RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
         if (rejectedExecutionHandler == null) {
             rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
         }
-        answer.setRejectedExecutionHandler(rejectedExecutionHandler);
 
-        return answer;
+        ScheduledThreadPoolExecutor answer = new ScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+        // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
+
+        // need to wrap the thread pool in a sized to guard against the problem that the
+        // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+        // we could potentially keep adding tasks, and run out of memory.
+        if (profile.getMaxPoolSize() > 0) {
+            return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+        } else {
+            return answer;
+        }
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java Fri Dec 16 18:27:59 2011
@@ -754,8 +754,8 @@ public class DefaultManagementLifecycleS
 
         boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() != ManagementStatisticsLevel.Off;
         if (enabled) {
-            LOG.info("StatiticsLevel at {} so enabling load performance statistics", camelContext.getManagementStrategy().getStatisticsLevel());
-            ScheduledExecutorService executorService = camelContext.getExecutorServiceManager().newDefaultScheduledThreadPool(this, "ManagementLoadTask");
+            LOG.info("StatisticsLevel at {} so enabling load performance statistics", camelContext.getManagementStrategy().getStatisticsLevel());
+            ScheduledExecutorService executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "ManagementLoadTask");
             timerListenerManager.setExecutorService(executorService);
             // must use 1 sec interval as the load statistics is based on 1 sec calculations
             timerListenerManager.setInterval(1000);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Fri Dec 16 18:27:59 2011
@@ -83,7 +83,7 @@ public class DelayDefinition extends Exp
         if (getAsyncDelayed() != null && getAsyncDelayed()) {
             scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this);
             if (scheduled == null) {
-                scheduled = routeContext.getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "Delay");
+                scheduled = routeContext.getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this, "Delay");
             }
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java Fri Dec 16 18:27:59 2011
@@ -289,13 +289,7 @@ public final class ProcessorDefinitionHe
                 // then create a thread pool assuming the ref is a thread pool profile id
                 ThreadPoolProfile profile = manager.getThreadPoolProfile(definition.getExecutorServiceRef());
                 if (profile != null) {
-                    // okay we need to grab the pool size from the ref
-                    Integer poolSize = profile.getPoolSize();
-                    if (poolSize == null) {
-                        // fallback and use the default pool size, if none was set on the profile
-                        poolSize = manager.getDefaultThreadPoolProfile().getPoolSize();
-                    }
-                    answer = manager.newScheduledThreadPool(definition, name, poolSize);
+                    answer = manager.newScheduledThreadPool(definition, name, profile);
                 }
             }
             if (answer == null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Fri Dec 16 18:27:59 2011
@@ -87,7 +87,7 @@ public class ThrottleDefinition extends 
         if (getAsyncDelayed() != null && getAsyncDelayed()) {
             scheduled = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this);
             if (scheduled == null) {
-                scheduled = routeContext.getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "Throttle");
+                scheduled = routeContext.getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this, "Throttle");
             }
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Fri Dec 16 18:27:59 2011
@@ -84,6 +84,7 @@ public abstract class DelayProcessorSupp
         long delay = calculateDelay(exchange);
         if (delay <= 0) {
             // no delay then continue routing
+            log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
             return super.process(exchange, callback);
         }
 
@@ -113,6 +114,7 @@ public abstract class DelayProcessorSupp
                     if (!isRunAllowed()) {
                         exchange.setException(new RejectedExecutionException());
                     } else {
+                        log.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId());
                         // let caller run by processing
                         try {
                             delay(delay, exchange);

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java?rev=1215240&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java Fri Dec 16 18:27:59 2011
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sized {@link ScheduledExecutorService} which will reject executing tasks if the task queue is full.
+ * <p/>
+ * The {@link ScheduledThreadPoolExecutor} which is the default implementation of the {@link ScheduledExecutorService}
+ * has unbounded task queue, which mean you can keep scheduling tasks which may cause the system to run out of memory.
+ * <p/>
+ * This class is a wrapped for {@link ScheduledThreadPoolExecutor} to reject executing tasks if an upper limit
+ * of the task queue has been reached.
+ */
+public class SizedScheduledExecutorService implements ScheduledExecutorService {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(SizedScheduledExecutorService.class); 
+    private final ScheduledThreadPoolExecutor delegate;
+    private final long queueSize;
+
+    /**
+     * Creates a new sized {@link ScheduledExecutorService} with the given queue size as upper task limit.
+     *
+     * @param delegate   the delegate of the actual thread pool implementation
+     * @param queueSize  the upper queue size, use 0 or negative value for unlimited
+     */
+    public SizedScheduledExecutorService(ScheduledThreadPoolExecutor delegate, long queueSize) {
+        this.delegate = delegate;
+        this.queueSize = queueSize;
+    }
+
+    /**
+     * Gets the wrapped {@link ScheduledThreadPoolExecutor}
+     */
+    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
+        return delegate;
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit timeUnit) {
+        if (canScheduleOrExecute()) {
+            return delegate.schedule(task, delay, timeUnit);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit timeUnit) {
+        if (canScheduleOrExecute()) {
+            return delegate.schedule(task, delay, timeUnit);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit timeUnit) {
+        if (canScheduleOrExecute()) {
+            return delegate.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long period, TimeUnit timeUnit) {
+        if (canScheduleOrExecute()) {
+            return delegate.scheduleWithFixedDelay(task, initialDelay, period, timeUnit);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException {
+        return delegate.awaitTermination(timeout, timeUnit);
+    }
+
+    public int getActiveCount() {
+        return delegate.getActiveCount();
+    }
+
+    public long getCompletedTaskCount() {
+        return delegate.getCompletedTaskCount();
+    }
+
+    public int getCorePoolSize() {
+        return delegate.getCorePoolSize();
+    }
+
+    public long getKeepAliveTime(TimeUnit timeUnit) {
+        return delegate.getKeepAliveTime(timeUnit);
+    }
+
+    public int getLargestPoolSize() {
+        return delegate.getLargestPoolSize();
+    }
+
+    public int getMaximumPoolSize() {
+        return delegate.getMaximumPoolSize();
+    }
+
+    public int getPoolSize() {
+        return delegate.getPoolSize();
+    }
+
+    public RejectedExecutionHandler getRejectedExecutionHandler() {
+        return delegate.getRejectedExecutionHandler();
+    }
+
+    public long getTaskCount() {
+        return delegate.getTaskCount();
+    }
+
+    public ThreadFactory getThreadFactory() {
+        return delegate.getThreadFactory();
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        if (canScheduleOrExecute()) {
+            return delegate.invokeAll(tasks);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) throws InterruptedException {
+        if (canScheduleOrExecute()) {
+            return delegate.invokeAll(tasks, timeout, timeUnit);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        if (canScheduleOrExecute()) {
+            return delegate.invokeAny(tasks);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
+        if (canScheduleOrExecute()) {
+            return delegate.invokeAny(tasks, timeout, timeUnit);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return delegate.isTerminated();
+    }
+
+    public boolean isTerminating() {
+        return delegate.isTerminating();
+    }
+
+    public int prestartAllCoreThreads() {
+        return delegate.prestartAllCoreThreads();
+    }
+
+    public boolean prestartCoreThread() {
+        return delegate.prestartCoreThread();
+    }
+
+    public void purge() {
+        delegate.purge();
+    }
+
+    public void setCorePoolSize(int corePoolSize) {
+        delegate.setCorePoolSize(corePoolSize);
+    }
+
+    public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
+        delegate.setKeepAliveTime(keepAliveTime, timeUnit);
+    }
+
+    public void setMaximumPoolSize(int maximumPoolSize) {
+        delegate.setMaximumPoolSize(maximumPoolSize);
+    }
+
+    public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
+        delegate.setRejectedExecutionHandler(rejectedExecutionHandler);
+    }
+
+    public void setThreadFactory(ThreadFactory threadFactory) {
+        delegate.setThreadFactory(threadFactory);
+    }
+
+    @Override
+    public void shutdown() {
+        delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return delegate.shutdownNow();
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        if (canScheduleOrExecute()) {
+            return delegate.submit(task);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        if (canScheduleOrExecute()) {
+            return delegate.submit(task);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        if (canScheduleOrExecute()) {
+            return delegate.submit(task, result);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    @Override
+    public void execute(Runnable task) {
+        if (canScheduleOrExecute()) {
+            delegate.execute(task);
+        } else {
+            throw new RejectedExecutionException("Task rejected due queue size limit reached");
+        }
+    }
+
+    public void allowCoreThreadTimeOut(boolean value) {
+        delegate.allowCoreThreadTimeOut(value);
+    }
+
+    public boolean allowsCoreThreadTimeOut() {
+        return delegate.allowsCoreThreadTimeOut();
+    }
+
+    /**
+     * Can the task be scheduled or executed?
+     *
+     * @return <tt>true</tt> to accept, <tt>false</tt> to not accept
+     */
+    protected boolean canScheduleOrExecute() {
+        if (queueSize <= 0) {
+            return true;
+        }
+
+        int size = delegate.getQueue().size();
+        boolean answer = size < queueSize;
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("canScheduleOrExecute {} < {} -> {}", new Object[]{size, queueSize, answer});
+        }
+        return answer;
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+}

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1215240&r1=1215239&r2=1215240&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java Fri Dec 16 18:27:59 2011
@@ -17,13 +17,13 @@
 package org.apache.camel.impl;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.ThreadPoolRejectedPolicy;
 import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 
 /**
  * @version 
@@ -387,7 +387,7 @@ public class DefaultExecutorServiceManag
         ExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(this, "Cool", 5);
         assertNotNull(pool);
 
-        ScheduledThreadPoolExecutor tp = assertIsInstanceOf(ScheduledThreadPoolExecutor.class, pool);
+        SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool);
         // a scheduled dont use keep alive
         assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
         assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize());
@@ -403,7 +403,7 @@ public class DefaultExecutorServiceManag
         ExecutorService pool = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "Cool");
         assertNotNull(pool);
 
-        ScheduledThreadPoolExecutor tp = assertIsInstanceOf(ScheduledThreadPoolExecutor.class, pool);
+        SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool);
         // a scheduled dont use keep alive
         assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
         assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize());
@@ -444,7 +444,7 @@ public class DefaultExecutorServiceManag
         ExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(this, "Cool", "foo");
         assertNotNull(pool);
 
-        ScheduledThreadPoolExecutor tp = assertIsInstanceOf(ScheduledThreadPoolExecutor.class, pool);
+        SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool);
         // a scheduled dont use keep alive
         assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS));
         assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize());

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java?rev=1215240&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java Fri Dec 16 18:27:59 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
+
+/**
+ *
+ */
+public class ThrottlerAsyncDelayedCallerRunsTest extends ContextTestSupport {
+    
+    public void testThrottler() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(6);
+
+        template.sendBody("seda:start", "A");
+        template.sendBody("seda:start", "B");
+        template.sendBody("seda:start", "C");
+        template.sendBody("seda:start", "D");
+        template.sendBody("seda:start", "E");
+        template.sendBody("seda:start", "F");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // create a profile for the throttler
+                ThreadPoolProfileBuilder builder = new ThreadPoolProfileBuilder("myThrottler");
+                builder.maxQueueSize(2);
+                context.getExecutorServiceManager().registerThreadPoolProfile(builder.build());
+                
+                from("seda:start")
+                    .throttle(1).timePeriodMillis(100)
+                        .asyncDelayed().executorServiceRef("myThrottler").callerRunsWhenRejected(true)
+                    .to("mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java?rev=1215240&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java Fri Dec 16 18:27:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+/**
+ *
+ */
+public class SizedScheduledExecutorServiceTest extends TestCase {
+    
+    public void testSizedScheduledExecutorService() throws Exception {
+        ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(5);
+        
+        SizedScheduledExecutorService sized = new SizedScheduledExecutorService(delegate, 2);
+        
+        Runnable task = new Runnable() {
+            @Override
+            public void run() {
+                // noop
+            }
+        };
+
+        sized.schedule(task, 2, TimeUnit.SECONDS);
+        sized.schedule(task, 3, TimeUnit.SECONDS);
+        
+        try {
+            sized.schedule(task, 4, TimeUnit.SECONDS);
+            fail("Should have thrown exception");
+        } catch (RejectedExecutionException e) {
+            assertEquals("Task rejected due queue size limit reached", e.getMessage());
+        }
+
+        sized.shutdownNow();
+        assertTrue("Should be shutdown", sized.isShutdown() || sized.isTerminating());
+        assertTrue("Should be shutdown", delegate.isShutdown() || sized.isTerminating());
+    }
+}