You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/07/11 23:15:29 UTC
svn commit: r1360401 -
/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
Author: dkulp
Date: Wed Jul 11 21:15:28 2012
New Revision: 1360401
URL: http://svn.apache.org/viewvc?rev=1360401&view=rev
Log:
Updates to workqueue to work better with Java7
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1360401&r1=1360400&r2=1360401&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Wed Jul 11 21:15:28 2012
@@ -63,8 +63,12 @@ public class AutomaticWorkQueueImpl impl
int lowWaterMark;
int highWaterMark;
long dequeueTimeout;
+ volatile int approxThreadCount;
ThreadPoolExecutor executor;
+ Method addWorkerMethod;
+ Object addWorkerArgs[];
+
AWQThreadFactory threadFactory;
ReentrantLock mainLock;
@@ -204,10 +208,27 @@ public class AutomaticWorkQueueImpl impl
l = new ReentrantLock();
}
mainLock = l;
+
+ try {
+ //java 5/6
+ addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize",
+ Runnable.class);
+ addWorkerArgs = new Object[] {null};
+ } catch (Throwable t) {
+ try {
+ //java 7
+ addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addWorker",
+ Runnable.class, Boolean.TYPE);
+ addWorkerArgs = new Object[] {null, Boolean.FALSE};
+ } catch (Throwable t2) {
+ //nothing we cando
+ }
+ }
+
}
return executor;
}
- private static AWQThreadFactory createThreadFactory(final String name) {
+ private AWQThreadFactory createThreadFactory(final String nm) {
ThreadGroup group;
try {
//Try and find the highest level ThreadGroup that we're allowed to use.
@@ -227,14 +248,14 @@ public class AutomaticWorkQueueImpl impl
//ignore - if we get here, the "group" is as high as
//the security manager will allow us to go. Use that one.
}
- return new ThreadGroup(group, name + "-workqueue");
+ return new ThreadGroup(group, nm + "-workqueue");
}
}
);
} catch (SecurityException e) {
- group = new ThreadGroup(name + "-workqueue");
+ group = new ThreadGroup(nm + "-workqueue");
}
- return new AWQThreadFactory(group, name);
+ return new AWQThreadFactory(group, nm);
}
static class DelayedTaskWrapper implements Delayed, Runnable {
@@ -306,7 +327,7 @@ public class AutomaticWorkQueueImpl impl
}
}
- static class AWQThreadFactory implements ThreadFactory {
+ class AWQThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
ThreadGroup group;
String name;
@@ -319,12 +340,22 @@ public class AutomaticWorkQueueImpl impl
loader = AutomaticWorkQueueImpl.class.getClassLoader();
}
- public Thread newThread(Runnable r) {
+ public Thread newThread(final Runnable r) {
if (group.isDestroyed()) {
group = new ThreadGroup(group.getParent(), name + "-workqueue");
}
+ Runnable wrapped = new Runnable() {
+ public void run() {
+ ++approxThreadCount;
+ try {
+ r.run();
+ } finally {
+ --approxThreadCount;
+ }
+ }
+ };
final Thread t = new Thread(group,
- r,
+ wrapped,
name + "-workqueue-" + threadNumber.getAndIncrement(),
0);
AccessController.doPrivileged(new PrivilegedAction<Boolean>() {
@@ -404,12 +435,12 @@ public class AutomaticWorkQueueImpl impl
//of threads until the queue is full. However, we would
//prefer the number of threads to expand immediately and
//only uses the queue if we've reached the maximum number
- //of threads. Thus, we'll set the core size to the max,
- //add the runnable, and set back. That will cause the
- //threads to be created as needed.
+ //of threads.
ThreadPoolExecutor ex = getExecutor();
ex.execute(r);
- if (!ex.getQueue().isEmpty() && this.getPoolSize() < highWaterMark) {
+ if (addWorkerMethod != null
+ && !ex.getQueue().isEmpty()
+ && this.approxThreadCount < highWaterMark) {
mainLock.lock();
try {
int ps = this.getPoolSize();
@@ -417,9 +448,7 @@ public class AutomaticWorkQueueImpl impl
int sz2 = this.getActiveCount();
if ((sz + sz2) > ps) {
- Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize",
- Runnable.class);
- ReflectionUtil.setAccessible(m).invoke(executor, new Object[1]);
+ ReflectionUtil.setAccessible(addWorkerMethod).invoke(executor, addWorkerArgs);
}
} catch (Exception exc) {
//ignore