You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2011/08/19 19:21:00 UTC
svn commit: r1159696 - in /cxf/branches/2.4.x-fixes: ./
rt/core/src/main/java/org/apache/cxf/interceptor/
rt/core/src/main/java/org/apache/cxf/workqueue/
rt/core/src/test/java/org/apache/cxf/workqueue/
rt/ws/addr/src/main/java/org/apache/cxf/ws/address...
Author: dkulp
Date: Fri Aug 19 17:20:59 2011
New Revision: 1159696
URL: http://svn.apache.org/viewvc?rev=1159696&view=rev
Log:
Merged revisions 1159695 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1159695 | dkulp | 2011-08-19 13:18:56 -0400 (Fri, 19 Aug 2011) | 6 lines
[CXF-3750] Fix problem with CXF could lockup with a lot of long running
one-way operations.
Fix problems with OneWay and decoupled ws-addr not buffering the
incoming stream properly
Fix AutomaticWorkqueue to actually create threads as needed PRIOR to the
queue filling completely up.
........
Modified:
cxf/branches/2.4.x-fixes/ (props changed)
cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java
cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java
cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
Propchange: cxf/branches/2.4.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java (original)
+++ cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java Fri Aug 19 17:20:59 2011
@@ -76,7 +76,7 @@ public class OneWayProcessorInterceptor
//need to suck in all the data from the input stream as
//the transport might discard any data on the stream when this
//thread unwinds or when the empty response is sent back
- DelegatingInputStream in = message.get(DelegatingInputStream.class);
+ DelegatingInputStream in = message.getContent(DelegatingInputStream.class);
if (in != null) {
in.cacheInput();
}
@@ -102,12 +102,13 @@ public class OneWayProcessorInterceptor
if (Boolean.FALSE.equals(o)) {
chain.pause();
try {
- synchronized (chain) {
+ final Object lock = new Object();
+ synchronized (lock) {
message.getExchange().get(Bus.class).getExtension(WorkQueueManager.class)
.getAutomaticWorkQueue().execute(new Runnable() {
public void run() {
- synchronized (chain) {
- chain.notifyAll();
+ synchronized (lock) {
+ lock.notifyAll();
}
chain.resume();
}
@@ -115,7 +116,7 @@ public class OneWayProcessorInterceptor
//wait a few milliseconds for the background thread to start processing
//Mostly just to make an attempt at keeping the ordering of the
//messages coming in from a client. Not guaranteed though.
- chain.wait(20);
+ lock.wait(20);
}
} catch (RejectedExecutionException e) {
//the executor queue is full, so run the task in the caller thread
Modified: cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/branches/2.4.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Fri Aug 19 17:20:59 2011
@@ -19,6 +19,8 @@
package org.apache.cxf.workqueue;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.DelayQueue;
@@ -30,6 +32,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -52,6 +55,9 @@ public class AutomaticWorkQueueImpl exte
WorkQueueManagerImpl manager;
String name = "default";
+ final int corePoolSize;
+ final int maxPoolSize;
+ final ReentrantLock mainLock;
public AutomaticWorkQueueImpl() {
this(DEFAULT_MAX_QUEUE_SIZE);
@@ -125,6 +131,19 @@ public class AutomaticWorkQueueImpl exte
// start the watch dog thread
watchDog.setDaemon(true);
watchDog.start();
+
+ corePoolSize = this.getCorePoolSize();
+ maxPoolSize = this.getMaximumPoolSize();
+
+ ReentrantLock l = null;
+ try {
+ Field f = ThreadPoolExecutor.class.getDeclaredField("mainLock");
+ f.setAccessible(true);
+ l = (ReentrantLock)f.get(this);
+ } catch (Throwable t) {
+ l = new ReentrantLock();
+ }
+ mainLock = l;
}
private static ThreadFactory createThreadFactory(final String name) {
ThreadGroup group;
@@ -337,7 +356,33 @@ public class AutomaticWorkQueueImpl exte
}
}
};
+ //The ThreadPoolExecutor in the JDK doesn't expand the number
+ //of threads until the queue is full. However, we would
+ //prefer the number of threads to expand immediately and
+ //only uses the queue if we've reached the maximum number
+ //of threads. Thus, we'll set the core size to the max,
+ //add the runnable, and set back. That will cause the
+ //threads to be created as needed.
super.execute(r);
+ if (!getQueue().isEmpty() && this.getPoolSize() < maxPoolSize) {
+ mainLock.lock();
+ try {
+ int ps = this.getPoolSize();
+ int sz = getQueue().size();
+ int sz2 = this.getActiveCount();
+
+ if ((sz + sz2) > ps) {
+ Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize",
+ Runnable.class);
+ m.setAccessible(true);
+ m.invoke(this, new Object[1]);
+ }
+ } catch (Exception ex) {
+ //ignore
+ } finally {
+ mainLock.unlock();
+ }
+ }
}
// WorkQueue interface
Modified: cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java (original)
+++ cxf/branches/2.4.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java Fri Aug 19 17:20:59 2011
@@ -281,7 +281,7 @@ public class AutomaticWorkQueueTest exte
// Give threads a chance to dequeue (5sec max)
int i = 0;
- while (workqueue.getPoolSize() != 10 && i++ < 50) {
+ while (workqueue.getPoolSize() > 10 && i++ < 50) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
@@ -300,7 +300,7 @@ public class AutomaticWorkQueueTest exte
}
@Test
- public void testThreadPoolShrinkUnbounded() {
+ public void testThreadPoolShrinkUnbounded() throws Exception {
workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
UNBOUNDED_HIGH_WATER_MARK,
DEFAULT_LOW_WATER_MARK, 100L);
@@ -311,18 +311,15 @@ public class AutomaticWorkQueueTest exte
// Give threads a chance to dequeue (5sec max)
int i = 0;
int last = workqueue.getPoolSize();
- while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) {
+ while (workqueue.getPoolSize() > DEFAULT_LOW_WATER_MARK && i++ < 50) {
if (last != workqueue.getPoolSize()) {
last = workqueue.getPoolSize();
i = 0;
}
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- // ignore
- }
+ Thread.sleep(100);
}
- assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
+ int sz = workqueue.getPoolSize();
+ assertTrue("threads_total(): " + sz, workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
}
@Test
Modified: cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?rev=1159696&r1=1159695&r2=1159696&view=diff
==============================================================================
--- cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original)
+++ cxf/branches/2.4.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Fri Aug 19 17:20:59 2011
@@ -434,7 +434,7 @@ public final class ContextUtils {
//need to suck in all the data from the input stream as
//the transport might discard any data on the stream when this
//thread unwinds or when the empty response is sent back
- DelegatingInputStream in = inMessage.get(DelegatingInputStream.class);
+ DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class);
if (in != null) {
in.cacheInput();
}