You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2015/12/03 13:12:09 UTC

svn commit: r1717747 - /river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java

Author: peter_firmstone
Date: Thu Dec  3 12:12:08 2015
New Revision: 1717747

URL: http://svn.apache.org/viewvc?rev=1717747&view=rev
Log:
ThreadPool's implementation was not optimal and was found to be a hotspot in outrigger stress tests, it  is now a wrapper around a java concurrent Executor.

Modified:
    river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java

Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java?rev=1717747&r1=1717746&r2=1717747&view=diff
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java (original)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java Thu Dec  3 12:12:08 2015
@@ -18,14 +18,12 @@
 
 package org.apache.river.thread;
 
-import org.apache.river.action.GetLongAction;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -63,20 +61,8 @@ import java.util.logging.Logger;
  **/
 final class ThreadPool implements Executor, java.util.concurrent.Executor {
 
-    /** how long a thread waits in the idle state before passing away */
-    private static final long IDLE_TIMEOUT =		// default 5 minutes
-	((Long) AccessController.doPrivileged(new GetLongAction(
-	    "org.apache.river.thread.idleThreadTimeout", 300000)))
-	    .longValue();
-
     private static final Logger logger =
 	Logger.getLogger("org.apache.river.thread.ThreadPool");
-
-    /** thread group that this pool's threads execute in */
-    private final ThreadGroup threadGroup;
-
-    /** queues of tasks to execute */
-    private final BlockingQueue<Runnable> queue;
     
     /** 
      * This Executor is used by JERI (and other Jini implementation classes) 
@@ -98,18 +84,6 @@ final class ThreadPool implements Execut
      * ThreadPool must degrade gracefully when a system is under significant
      * load, but it must also execute tasks as soon as possible.
      * 
-     * To address these issues, a SynchronousQueue was originally selected, it has
-     * no storage capacity, it hands tasks directly from the calling thread to
-     * the task thread, however contention can cause more threads than necessary
-     * to be created, a LinkedBlockingQueue eliminates or reduces contention 
-     * between caller and worker threads, preventing unnecessary thread creation. 
-     * Consider TransferBlockingQueue when Java 6 is no longer supported.
-     * 
-     * Pool threads block waiting until a task is available or idleTimeout
-     * occurs after which the pool thread dies, client threads block waiting 
-     * until a task thread is available, or after an computed timeout elapses, 
-     * creates a new thread to execute the task.
-     * 
      * ThreadGroup is a construct originally intended for applet isolation, 
      * however it was never really successful, AccessControlContext 
      * is a much more effective way of controlling privilege.
@@ -117,25 +91,13 @@ final class ThreadPool implements Execut
      * We should consider changing this to ensure that each task is executed in the
      * AccessControlContext of the calling thread, to avoid privilege escalation.
      */
-    private final AtomicInteger workerCount;
-    private final AtomicInteger availableThreads;
     private volatile boolean shutdown = false;
+    private final ExecutorService es;
     
     ThreadPool(ThreadGroup threadGroup){
-        this(threadGroup, 10);
-    }
-
-    /**
-     * Creates a new thread group that executes tasks in threads of
-     * the given thread group.
-     */
-    ThreadPool(final ThreadGroup threadGroup, int delayFactor) {
-	this.threadGroup = threadGroup;
-        queue = new ArrayBlockingQueue<Runnable>(32);
-        workerCount = new AtomicInteger();
-        availableThreads = new AtomicInteger();
-//         Thread not started until after constructor completes
-//         this escaping occurs safely.
+        this(Executors.newCachedThreadPool(new TPThreadFactory(threadGroup))); // Final field freeze
+//      Thread not started until after constructor completes
+//      this escaping occurs safely anyway because of final field freeze.
         AccessController.doPrivileged(new PrivilegedAction(){
 
             @Override
@@ -145,6 +107,10 @@ final class ThreadPool implements Execut
             }
         });
     }
+ 
+    private ThreadPool(ExecutorService es){
+        this.es = es;
+    }
     
     private Thread shutdownHook(){
         Thread t = new Thread ( new Runnable(){
@@ -158,11 +124,7 @@ final class ThreadPool implements Execut
                     Thread.currentThread().interrupt();
                 }
                 shutdown = true;
-                Thread [] threads = new Thread [workerCount.get() + 1 ];
-                int count = threadGroup.enumerate(threads);
-                for (int i = 0; i < count; i++){
-                    threads [i].interrupt();
-                }
+                es.shutdown();
             }
         },"ThreadPool destroy");
         /**
@@ -180,27 +142,8 @@ final class ThreadPool implements Execut
     public void execute(Runnable runnable, String name) throws RejectedExecutionException {
         if (runnable == null) return;
         if (shutdown) throw new RejectedExecutionException("ThreadPool shutdown");
-	Runnable task = new Task(runnable, name);
-        /* Startup ramps up very quickly because there are no waiting
-         * threads available.
-         * 
-         * Tasks must not be allowed to build up in the queue, in case
-         * of dependencies. 
-         */
-        if ( availableThreads.get() < 2 )// Keep at least one thread ready. 
-        { // need more threads.
-            if (shutdown) throw 
-                new RejectedExecutionException("ThreadPool shutdown");
-                Thread t = AccessController.doPrivileged(
-                        new NewThreadAction(threadGroup, new Worker(task), name, false, 228));
-                t.start();
-        } else {
-            try {
-                queue.put(task);
-            } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
-            }
-        }
+        Runnable task = new Task(runnable, name);
+        es.submit(task);
     }
 
     @Override
@@ -223,7 +166,9 @@ final class ThreadPool implements Execut
         
         @Override
         public void run(){
+            Thread thread = Thread.currentThread();
             try {
+                thread.setName(NewThreadAction.NAME_PREFIX + name);
                 runnable.run();
             } catch (Exception t) { // Don't catch Error
                 logger.log(Level.WARNING, "uncaught exception", t);
@@ -239,6 +184,8 @@ final class ThreadPool implements Execut
                     // set so the while loop stops.
                     Thread.currentThread().interrupt();
                 }
+            } finally {
+                thread.setName(NewThreadAction.NAME_PREFIX + "idle");
             }
         }
 
@@ -247,58 +194,24 @@ final class ThreadPool implements Execut
             return name;
         }
     }
-
+    
     /**
-     * Worker executes an initial task, and then it executes tasks from the
-     * queue, passing away if ever idle for more than the idle timeout value.
+     * Thread stack size hint given to jvm to minimise memory consumption
+     * as this executor can create many threads, tasks executed are relatively
+     * simple and don't need much memory.  The jvm is free to ignore this hint.
      */
-    private class Worker implements Runnable {
-
-	private volatile Runnable first;
-
-	Worker(Runnable first) {
-	    this.first = first;
-	}
+    private static class TPThreadFactory implements ThreadFactory {
+        /** thread group that this pool's threads execute in */
+        final ThreadGroup threadGroup;
+        
+        TPThreadFactory (ThreadGroup group){
+            threadGroup = group;
+        }
 
-        @Override
-	public void run() {
-            workerCount.incrementAndGet();
-            try {
-                Runnable task = first;
-                first = null; // For garbage collection.
-                task.run();
-                Thread thread = Thread.currentThread();
-                while (!thread.isInterrupted()) {
-                    /*
-                     * REMIND: What if the task changed this thread's
-                     * priority? or context class loader?
-                     * 
-                     * thread.setName is not thread safe, so may not reflect
-                     * most up to date state
-                     */
-                    try {
-                        task = null;
-                        availableThreads.incrementAndGet();
-                        try {
-                        task = queue.poll(IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
-                        } finally {
-                            availableThreads.decrementAndGet();
-                        }
-                        if (task != null) {
-                            thread.setName(NewThreadAction.NAME_PREFIX + task);
-                            task.run();
-                            thread.setName(NewThreadAction.NAME_PREFIX + "Idle");
-                        } else {
-                            break; //Timeout or spurious wakeup.
-                        }                
-                    } catch (InterruptedException e){
-                        thread.interrupt();
-                        break;
-                    }
-                }
-            } finally {
-                workerCount.decrementAndGet();
-            }
+        public Thread newThread(Runnable r) {
+            return AccessController.doPrivileged(
+                        new NewThreadAction(threadGroup, r, NewThreadAction.NAME_PREFIX, false, 228));
         }
+        
     }
 }