You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2013/06/19 12:03:52 UTC

svn commit: r1494528 - in /tomcat/tc7.0.x/trunk: java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java java/org/apache/catalina/tribes/util/ExecutorFactory.java webapps/docs/changelog.xml

Author: kfujino
Date: Wed Jun 19 10:03:51 2013
New Revision: 1494528

URL: http://svn.apache.org/r1494528
Log:
Replace Tribes's TaskQueue as executor's workQueue in order to ensure that executor's maxThread works correctly.

Modified:
    tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
    tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml

Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=1494528&r1=1494527&r2=1494528&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Wed Jun 19 10:03:51 2013
@@ -16,8 +16,7 @@
  */
 package org.apache.catalina.tribes.group.interceptors;
 
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -25,6 +24,7 @@ import org.apache.catalina.tribes.Channe
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+import org.apache.catalina.tribes.util.ExecutorFactory;
 import org.apache.catalina.tribes.util.TcclThreadFactory;
 
 /**
@@ -40,11 +40,10 @@ import org.apache.catalina.tribes.util.T
 public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
 
     protected AtomicLong currentSize = new AtomicLong(0);
-    protected ThreadPoolExecutor executor = null;
+    protected ExecutorService executor = null;
     protected int maxThreads = 10;
     protected int maxSpareThreads = 2;
     protected long keepAliveTime = 5000;
-    protected LinkedBlockingQueue<Runnable> runnablequeue = new LinkedBlockingQueue<Runnable>();
 
     @Override
     public long getCurrentSize() {
@@ -83,9 +82,8 @@ public class MessageDispatch15Intercepto
     @Override
     public void startQueue() {
         if ( run ) return;
-        executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads,
-                keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue,
-                new TcclThreadFactory());
+        executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads,
+                keepAliveTime, TimeUnit.MILLISECONDS, new TcclThreadFactory());
         run = true;
     }
 
@@ -94,7 +92,6 @@ public class MessageDispatch15Intercepto
         run = false;
         executor.shutdownNow();
         setAndGetCurrentSize(0);
-        runnablequeue.clear();
     }
 
     public long getKeepAliveTime() {

Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=1494528&r1=1494527&r2=1494528&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Wed Jun 19 10:03:51 2013
@@ -18,9 +18,11 @@
 package org.apache.catalina.tribes.util;
 
 import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -29,18 +31,52 @@ public class ExecutorFactory {
 
     public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit) {
         TaskQueue taskqueue = new TaskQueue();
-        ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue);
+        ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue);
         taskqueue.setParent(service);
         return service;
     }
 
     public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) {
         TaskQueue taskqueue = new TaskQueue();
-        ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
+        ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
         taskqueue.setParent(service);
         return service;
     }
-    
+
+    // ---------------------------------------------- TribesThreadPoolExecutor Inner Class
+    private static class TribesThreadPoolExecutor extends ThreadPoolExecutor {
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+        }
+
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+                RejectedExecutionHandler handler) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+        }
+
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        }
+
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            try {
+                super.execute(command);
+            } catch (RejectedExecutionException rx) {
+                if (super.getQueue() instanceof TaskQueue) {
+                    TaskQueue queue = (TaskQueue)super.getQueue();
+                    if (!queue.force(command)) {
+                        throw new RejectedExecutionException("Queue capacity is full.");
+                    }
+                }
+            }
+        }
+    }
+
      // ---------------------------------------------- TaskQueue Inner Class
     private static class TaskQueue extends LinkedBlockingQueue<Runnable> {
         private static final long serialVersionUID = 1L;

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1494528&r1=1494527&r2=1494528&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Jun 19 10:03:51 2013
@@ -90,6 +90,11 @@
       <add>
         Add logging of when a member is unable to join the cluster. (kfujino)
       </add>
+      <fix>
+        Replace Tribes&apos;s <code>TaskQueue</code> as executor&apos;s
+        workQueue in order to ensure that executor&apos;s <code>maxThread</code>
+        works correctly. (kfujino)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Web applications">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org