You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2013/06/19 12:03:52 UTC
svn commit: r1494528 - in /tomcat/tc7.0.x/trunk:
java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
java/org/apache/catalina/tribes/util/ExecutorFactory.java
webapps/docs/changelog.xml
Author: kfujino
Date: Wed Jun 19 10:03:51 2013
New Revision: 1494528
URL: http://svn.apache.org/r1494528
Log:
Replace Tribes's TaskQueue as executor's workQueue in order to ensure that executor's maxThread works correctly.
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=1494528&r1=1494527&r2=1494528&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Wed Jun 19 10:03:51 2013
@@ -16,8 +16,7 @@
*/
package org.apache.catalina.tribes.group.interceptors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -25,6 +24,7 @@ import org.apache.catalina.tribes.Channe
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+import org.apache.catalina.tribes.util.ExecutorFactory;
import org.apache.catalina.tribes.util.TcclThreadFactory;
/**
@@ -40,11 +40,10 @@ import org.apache.catalina.tribes.util.T
public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
protected AtomicLong currentSize = new AtomicLong(0);
- protected ThreadPoolExecutor executor = null;
+ protected ExecutorService executor = null;
protected int maxThreads = 10;
protected int maxSpareThreads = 2;
protected long keepAliveTime = 5000;
- protected LinkedBlockingQueue<Runnable> runnablequeue = new LinkedBlockingQueue<Runnable>();
@Override
public long getCurrentSize() {
@@ -83,9 +82,8 @@ public class MessageDispatch15Intercepto
@Override
public void startQueue() {
if ( run ) return;
- executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads,
- keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue,
- new TcclThreadFactory());
+ executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads,
+ keepAliveTime, TimeUnit.MILLISECONDS, new TcclThreadFactory());
run = true;
}
@@ -94,7 +92,6 @@ public class MessageDispatch15Intercepto
run = false;
executor.shutdownNow();
setAndGetCurrentSize(0);
- runnablequeue.clear();
}
public long getKeepAliveTime() {
Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=1494528&r1=1494527&r2=1494528&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Wed Jun 19 10:03:51 2013
@@ -18,9 +18,11 @@
package org.apache.catalina.tribes.util;
import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -29,18 +31,52 @@ public class ExecutorFactory {
public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit) {
TaskQueue taskqueue = new TaskQueue();
- ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue);
+ ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue);
taskqueue.setParent(service);
return service;
}
public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) {
TaskQueue taskqueue = new TaskQueue();
- ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
+ ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
taskqueue.setParent(service);
return service;
}
-
+
+ // ---------------------------------------------- TribesThreadPoolExecutor Inner Class
+ private static class TribesThreadPoolExecutor extends ThreadPoolExecutor {
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+ }
+
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+ }
+
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ super.execute(command);
+ } catch (RejectedExecutionException rx) {
+ if (super.getQueue() instanceof TaskQueue) {
+ TaskQueue queue = (TaskQueue)super.getQueue();
+ if (!queue.force(command)) {
+ throw new RejectedExecutionException("Queue capacity is full.");
+ }
+ }
+ }
+ }
+ }
+
// ---------------------------------------------- TaskQueue Inner Class
private static class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1494528&r1=1494527&r2=1494528&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Jun 19 10:03:51 2013
@@ -90,6 +90,11 @@
<add>
Add logging of when a member is unable to join the cluster. (kfujino)
</add>
+ <fix>
+ Replace Tribes's <code>TaskQueue</code> as executor's
+ workQueue in order to ensure that executor's <code>maxThread</code>
+ works correctly. (kfujino)
+ </fix>
</changelog>
</subsection>
<subsection name="Web applications">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org