You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/07/11 23:15:29 UTC

svn commit: r1360401 - /cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java

Author: dkulp
Date: Wed Jul 11 21:15:28 2012
New Revision: 1360401

URL: http://svn.apache.org/viewvc?rev=1360401&view=rev
Log:
Updates to workqueue to work better with Java7

Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1360401&r1=1360400&r2=1360401&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Wed Jul 11 21:15:28 2012
@@ -63,8 +63,12 @@ public class AutomaticWorkQueueImpl impl
     int lowWaterMark;
     int highWaterMark; 
     long dequeueTimeout;
+    volatile int approxThreadCount;
 
     ThreadPoolExecutor executor;
+    Method addWorkerMethod;
+    Object addWorkerArgs[];
+    
     AWQThreadFactory threadFactory;
     ReentrantLock mainLock;
     
@@ -204,10 +208,27 @@ public class AutomaticWorkQueueImpl impl
                 l = new ReentrantLock();
             }
             mainLock = l;
+
+            try {
+                //java 5/6
+                addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize",
+                                                                             Runnable.class);
+                addWorkerArgs = new Object[] {null};
+            } catch (Throwable t) {
+                try {
+                    //java 7
+                    addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addWorker",
+                                                                                 Runnable.class, Boolean.TYPE);
+                    addWorkerArgs = new Object[] {null, Boolean.FALSE};
+                } catch (Throwable t2) {
+                    //nothing we cando
+                }
+            }
+
         }
         return executor;
     }
-    private static AWQThreadFactory createThreadFactory(final String name) {
+    private AWQThreadFactory createThreadFactory(final String nm) {
         ThreadGroup group;
         try { 
             //Try and find the highest level ThreadGroup that we're allowed to use.
@@ -227,14 +248,14 @@ public class AutomaticWorkQueueImpl impl
                             //ignore - if we get here, the "group" is as high as 
                             //the security manager will allow us to go.   Use that one.
                         }
-                        return new ThreadGroup(group, name + "-workqueue");
+                        return new ThreadGroup(group, nm + "-workqueue");
                     } 
                 }
             );
         } catch (SecurityException e) { 
-            group = new ThreadGroup(name + "-workqueue");
+            group = new ThreadGroup(nm + "-workqueue");
         }
-        return new AWQThreadFactory(group, name);
+        return new AWQThreadFactory(group, nm);
     }
     
     static class DelayedTaskWrapper implements Delayed, Runnable {
@@ -306,7 +327,7 @@ public class AutomaticWorkQueueImpl impl
         }
         
     }
-    static class AWQThreadFactory implements ThreadFactory {
+    class AWQThreadFactory implements ThreadFactory {
         final AtomicInteger threadNumber = new AtomicInteger(1);
         ThreadGroup group;
         String name;
@@ -319,12 +340,22 @@ public class AutomaticWorkQueueImpl impl
             loader = AutomaticWorkQueueImpl.class.getClassLoader();
         }
         
-        public Thread newThread(Runnable r) {
+        public Thread newThread(final Runnable r) {
             if (group.isDestroyed()) {
                 group = new ThreadGroup(group.getParent(), name + "-workqueue");
             }
+            Runnable wrapped = new Runnable() {
+                public void run() {
+                    ++approxThreadCount;
+                    try {
+                        r.run();
+                    } finally {
+                        --approxThreadCount;
+                    }
+                }
+            };
             final Thread t = new Thread(group, 
-                                  r, 
+                                  wrapped, 
                                   name + "-workqueue-" + threadNumber.getAndIncrement(),
                                   0);
             AccessController.doPrivileged(new PrivilegedAction<Boolean>() {
@@ -404,12 +435,12 @@ public class AutomaticWorkQueueImpl impl
         //of threads until the queue is full.   However, we would 
         //prefer the number of threads to expand immediately and 
         //only uses the queue if we've reached the maximum number 
-        //of threads.   Thus, we'll set the core size to the max,
-        //add the runnable, and set back.  That will cause the
-        //threads to be created as needed.
+        //of threads.
         ThreadPoolExecutor ex = getExecutor();
         ex.execute(r);
-        if (!ex.getQueue().isEmpty() && this.getPoolSize() < highWaterMark) {
+        if (addWorkerMethod != null 
+            && !ex.getQueue().isEmpty() 
+            && this.approxThreadCount < highWaterMark) {
             mainLock.lock();
             try {
                 int ps = this.getPoolSize();
@@ -417,9 +448,7 @@ public class AutomaticWorkQueueImpl impl
                 int sz2 = this.getActiveCount();
                 
                 if ((sz + sz2) > ps) {
-                    Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize",
-                                                                          Runnable.class);
-                    ReflectionUtil.setAccessible(m).invoke(executor, new Object[1]);
+                    ReflectionUtil.setAccessible(addWorkerMethod).invoke(executor, addWorkerArgs);
                 }
             } catch (Exception exc) {
                 //ignore