You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2015/12/03 13:12:09 UTC
svn commit: r1717747 -
/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java
Author: peter_firmstone
Date: Thu Dec 3 12:12:08 2015
New Revision: 1717747
URL: http://svn.apache.org/viewvc?rev=1717747&view=rev
Log:
ThreadPool's implementation was not optimal and was found to be a hotspot in outrigger stress tests, it is now a wrapper around a java concurrent Executor.
Modified:
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java
Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java?rev=1717747&r1=1717746&r2=1717747&view=diff
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java (original)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java Thu Dec 3 12:12:08 2015
@@ -18,14 +18,12 @@
package org.apache.river.thread;
-import org.apache.river.action.GetLongAction;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -63,20 +61,8 @@ import java.util.logging.Logger;
**/
final class ThreadPool implements Executor, java.util.concurrent.Executor {
- /** how long a thread waits in the idle state before passing away */
- private static final long IDLE_TIMEOUT = // default 5 minutes
- ((Long) AccessController.doPrivileged(new GetLongAction(
- "org.apache.river.thread.idleThreadTimeout", 300000)))
- .longValue();
-
private static final Logger logger =
Logger.getLogger("org.apache.river.thread.ThreadPool");
-
- /** thread group that this pool's threads execute in */
- private final ThreadGroup threadGroup;
-
- /** queues of tasks to execute */
- private final BlockingQueue<Runnable> queue;
/**
* This Executor is used by JERI (and other Jini implementation classes)
@@ -98,18 +84,6 @@ final class ThreadPool implements Execut
* ThreadPool must degrade gracefully when a system is under significant
* load, but it must also execute tasks as soon as possible.
*
- * To address these issues, a SynchronousQueue was originally selected, it has
- * no storage capacity, it hands tasks directly from the calling thread to
- * the task thread, however contention can cause more threads than necessary
- * to be created, a LinkedBlockingQueue eliminates or reduces contention
- * between caller and worker threads, preventing unnecessary thread creation.
- * Consider TransferBlockingQueue when Java 6 is no longer supported.
- *
- * Pool threads block waiting until a task is available or idleTimeout
- * occurs after which the pool thread dies, client threads block waiting
- * until a task thread is available, or after an computed timeout elapses,
- * creates a new thread to execute the task.
- *
* ThreadGroup is a construct originally intended for applet isolation,
* however it was never really successful, AccessControlContext
* is a much more effective way of controlling privilege.
@@ -117,25 +91,13 @@ final class ThreadPool implements Execut
* We should consider changing this to ensure that each task is executed in the
* AccessControlContext of the calling thread, to avoid privilege escalation.
*/
- private final AtomicInteger workerCount;
- private final AtomicInteger availableThreads;
private volatile boolean shutdown = false;
+ private final ExecutorService es;
ThreadPool(ThreadGroup threadGroup){
- this(threadGroup, 10);
- }
-
- /**
- * Creates a new thread group that executes tasks in threads of
- * the given thread group.
- */
- ThreadPool(final ThreadGroup threadGroup, int delayFactor) {
- this.threadGroup = threadGroup;
- queue = new ArrayBlockingQueue<Runnable>(32);
- workerCount = new AtomicInteger();
- availableThreads = new AtomicInteger();
-// Thread not started until after constructor completes
-// this escaping occurs safely.
+ this(Executors.newCachedThreadPool(new TPThreadFactory(threadGroup))); // Final field freeze
+// Thread not started until after constructor completes
+// this escaping occurs safely anyway because of final field freeze.
AccessController.doPrivileged(new PrivilegedAction(){
@Override
@@ -145,6 +107,10 @@ final class ThreadPool implements Execut
}
});
}
+
+ private ThreadPool(ExecutorService es){
+ this.es = es;
+ }
private Thread shutdownHook(){
Thread t = new Thread ( new Runnable(){
@@ -158,11 +124,7 @@ final class ThreadPool implements Execut
Thread.currentThread().interrupt();
}
shutdown = true;
- Thread [] threads = new Thread [workerCount.get() + 1 ];
- int count = threadGroup.enumerate(threads);
- for (int i = 0; i < count; i++){
- threads [i].interrupt();
- }
+ es.shutdown();
}
},"ThreadPool destroy");
/**
@@ -180,27 +142,8 @@ final class ThreadPool implements Execut
public void execute(Runnable runnable, String name) throws RejectedExecutionException {
if (runnable == null) return;
if (shutdown) throw new RejectedExecutionException("ThreadPool shutdown");
- Runnable task = new Task(runnable, name);
- /* Startup ramps up very quickly because there are no waiting
- * threads available.
- *
- * Tasks must not be allowed to build up in the queue, in case
- * of dependencies.
- */
- if ( availableThreads.get() < 2 )// Keep at least one thread ready.
- { // need more threads.
- if (shutdown) throw
- new RejectedExecutionException("ThreadPool shutdown");
- Thread t = AccessController.doPrivileged(
- new NewThreadAction(threadGroup, new Worker(task), name, false, 228));
- t.start();
- } else {
- try {
- queue.put(task);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
+ Runnable task = new Task(runnable, name);
+ es.submit(task);
}
@Override
@@ -223,7 +166,9 @@ final class ThreadPool implements Execut
@Override
public void run(){
+ Thread thread = Thread.currentThread();
try {
+ thread.setName(NewThreadAction.NAME_PREFIX + name);
runnable.run();
} catch (Exception t) { // Don't catch Error
logger.log(Level.WARNING, "uncaught exception", t);
@@ -239,6 +184,8 @@ final class ThreadPool implements Execut
// set so the while loop stops.
Thread.currentThread().interrupt();
}
+ } finally {
+ thread.setName(NewThreadAction.NAME_PREFIX + "idle");
}
}
@@ -247,58 +194,24 @@ final class ThreadPool implements Execut
return name;
}
}
-
+
/**
- * Worker executes an initial task, and then it executes tasks from the
- * queue, passing away if ever idle for more than the idle timeout value.
+ * Thread stack size hint given to jvm to minimise memory consumption
+ * as this executor can create many threads, tasks executed are relatively
+ * simple and don't need much memory. The jvm is free to ignore this hint.
*/
- private class Worker implements Runnable {
-
- private volatile Runnable first;
-
- Worker(Runnable first) {
- this.first = first;
- }
+ private static class TPThreadFactory implements ThreadFactory {
+ /** thread group that this pool's threads execute in */
+ final ThreadGroup threadGroup;
+
+ TPThreadFactory (ThreadGroup group){
+ threadGroup = group;
+ }
- @Override
- public void run() {
- workerCount.incrementAndGet();
- try {
- Runnable task = first;
- first = null; // For garbage collection.
- task.run();
- Thread thread = Thread.currentThread();
- while (!thread.isInterrupted()) {
- /*
- * REMIND: What if the task changed this thread's
- * priority? or context class loader?
- *
- * thread.setName is not thread safe, so may not reflect
- * most up to date state
- */
- try {
- task = null;
- availableThreads.incrementAndGet();
- try {
- task = queue.poll(IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
- } finally {
- availableThreads.decrementAndGet();
- }
- if (task != null) {
- thread.setName(NewThreadAction.NAME_PREFIX + task);
- task.run();
- thread.setName(NewThreadAction.NAME_PREFIX + "Idle");
- } else {
- break; //Timeout or spurious wakeup.
- }
- } catch (InterruptedException e){
- thread.interrupt();
- break;
- }
- }
- } finally {
- workerCount.decrementAndGet();
- }
+ public Thread newThread(Runnable r) {
+ return AccessController.doPrivileged(
+ new NewThreadAction(threadGroup, r, NewThreadAction.NAME_PREFIX, false, 228));
}
+
}
}