You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/08/19 19:21:00 UTC

svn commit: r1159696 - in /cxf/branches/2.4.x-fixes: ./ rt/core/src/main/java/org/apache/cxf/interceptor/ rt/core/src/main/java/org/apache/cxf/workqueue/ rt/core/src/test/java/org/apache/cxf/workqueue/ rt/ws/addr/src/main/java/org/apache/cxf/ws/address...

Author: dkulp
Date: Fri Aug 19 17:20:59 2011
New Revision: 1159696

URL: http://svn.apache.org/viewvc?rev=1159696&view=rev
Log:
Merged revisions 1159695 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1159695 | dkulp | 2011-08-19 13:18:56 -0400 (Fri, 19 Aug 2011) | 6 lines
  
  [CXF-3750] Fix problem with CXF could lockup with a lot of long running
  one-way operations.
  Fix problems with OneWay and decoupled ws-addr not buffering the
  incoming stream properly
  Fix AutomaticWorkqueue to actually create threads as needed PRIOR to the
  queue filling completely up.
........

Modified:
    cxf/branches/2.4.x-fixes/   (props changed)
    cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java
    cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
    cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java
    cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java

Propchange: cxf/branches/2.4.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java (original)
+++ cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java Fri Aug 19 17:20:59 2011
@@ -76,7 +76,7 @@ public class OneWayProcessorInterceptor 
                 //need to suck in all the data from the input stream as
                 //the transport might discard any data on the stream when this 
                 //thread unwinds or when the empty response is sent back
-                DelegatingInputStream in = message.get(DelegatingInputStream.class);
+                DelegatingInputStream in = message.getContent(DelegatingInputStream.class);
                 if (in != null) {
                     in.cacheInput();
                 }
@@ -102,12 +102,13 @@ public class OneWayProcessorInterceptor 
             if (Boolean.FALSE.equals(o)) {
                 chain.pause();
                 try {
-                    synchronized (chain) {
+                    final Object lock = new Object();
+                    synchronized (lock) {
                         message.getExchange().get(Bus.class).getExtension(WorkQueueManager.class)
                             .getAutomaticWorkQueue().execute(new Runnable() {
                                 public void run() {
-                                    synchronized (chain) {
-                                        chain.notifyAll();
+                                    synchronized (lock) {
+                                        lock.notifyAll();
                                     }
                                     chain.resume();
                                 }
@@ -115,7 +116,7 @@ public class OneWayProcessorInterceptor 
                         //wait a few milliseconds for the background thread to start processing
                         //Mostly just to make an attempt at keeping the ordering of the 
                         //messages coming in from a client.  Not guaranteed though.
-                        chain.wait(20);
+                        lock.wait(20);
                     }
                 } catch (RejectedExecutionException e) {
                     //the executor queue is full, so run the task in the caller thread

Modified: cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Fri Aug 19 17:20:59 2011
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.workqueue;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.concurrent.DelayQueue;
@@ -30,6 +32,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -52,6 +55,9 @@ public class AutomaticWorkQueueImpl exte
     
     WorkQueueManagerImpl manager;
     String name = "default";
+    final int corePoolSize;
+    final int maxPoolSize;
+    final ReentrantLock mainLock;
 
     public AutomaticWorkQueueImpl() {
         this(DEFAULT_MAX_QUEUE_SIZE);
@@ -125,6 +131,19 @@ public class AutomaticWorkQueueImpl exte
         // start the watch dog thread
         watchDog.setDaemon(true);
         watchDog.start();
+        
+        corePoolSize = this.getCorePoolSize();
+        maxPoolSize = this.getMaximumPoolSize(); 
+        
+        ReentrantLock l = null;
+        try {
+            Field f = ThreadPoolExecutor.class.getDeclaredField("mainLock");
+            f.setAccessible(true);
+            l = (ReentrantLock)f.get(this);
+        } catch (Throwable t) {
+            l = new ReentrantLock();
+        }
+        mainLock = l;
     }
     private static ThreadFactory createThreadFactory(final String name) {
         ThreadGroup group;
@@ -337,7 +356,33 @@ public class AutomaticWorkQueueImpl exte
                 }
             }
         };
+        //The ThreadPoolExecutor in the JDK doesn't expand the number
+        //of threads until the queue is full.   However, we would 
+        //prefer the number of threads to expand immediately and 
+        //only uses the queue if we've reached the maximum number 
+        //of threads.   Thus, we'll set the core size to the max,
+        //add the runnable, and set back.  That will cause the
+        //threads to be created as needed.
         super.execute(r);
+        if (!getQueue().isEmpty() && this.getPoolSize() < maxPoolSize) {
+            mainLock.lock();
+            try {
+                int ps = this.getPoolSize();
+                int sz = getQueue().size();
+                int sz2 = this.getActiveCount();
+                
+                if ((sz + sz2) > ps) {
+                    Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize",
+                                                                          Runnable.class);
+                    m.setAccessible(true);
+                    m.invoke(this, new Object[1]);
+                }
+            } catch (Exception ex) {
+                //ignore
+            } finally {
+                mainLock.unlock();
+            }
+        }
     }
     
     // WorkQueue interface

Modified: cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java (original)
+++ cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java Fri Aug 19 17:20:59 2011
@@ -281,7 +281,7 @@ public class AutomaticWorkQueueTest exte
 
         // Give threads a chance to dequeue (5sec max)
         int i = 0;
-        while (workqueue.getPoolSize() != 10 && i++ < 50) {
+        while (workqueue.getPoolSize() > 10 && i++ < 50) {
             try {
                 Thread.sleep(100);
             } catch (InterruptedException ie) {
@@ -300,7 +300,7 @@ public class AutomaticWorkQueueTest exte
     }
 
     @Test
-    public void testThreadPoolShrinkUnbounded() {
+    public void testThreadPoolShrinkUnbounded() throws Exception {
         workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
                                                UNBOUNDED_HIGH_WATER_MARK,
                                                DEFAULT_LOW_WATER_MARK, 100L);
@@ -311,18 +311,15 @@ public class AutomaticWorkQueueTest exte
         // Give threads a chance to dequeue (5sec max)
         int i = 0;
         int last = workqueue.getPoolSize();
-        while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) {
+        while (workqueue.getPoolSize() > DEFAULT_LOW_WATER_MARK && i++ < 50) {
             if (last != workqueue.getPoolSize()) {
                 last = workqueue.getPoolSize();
                 i = 0;
             }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException ie) {
-                // ignore
-            }
+            Thread.sleep(100);
         }
-        assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
+        int sz = workqueue.getPoolSize();
+        assertTrue("threads_total(): " + sz, workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
     }
 
     @Test    

Modified: cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original)
+++ cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Fri Aug 19 17:20:59 2011
@@ -434,7 +434,7 @@ public final class ContextUtils {
                         //need to suck in all the data from the input stream as
                         //the transport might discard any data on the stream when this 
                         //thread unwinds or when the empty response is sent back
-                        DelegatingInputStream in = inMessage.get(DelegatingInputStream.class);
+                        DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class);
                         if (in != null) {
                             in.cacheInput();
                         }