You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/05/14 19:40:59 UTC

svn commit: r774851 - in /cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue: AutomaticWorkQueueImpl.java WorkQueueManagerImpl.java

Author: dkulp
Date: Thu May 14 17:40:59 2009
New Revision: 774851

URL: http://svn.apache.org/viewvc?rev=774851&view=rev
Log:
[CXF-2220] Attempt to fix classloader issues with the AutomaticWorkQueueImpl

Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=774851&r1=774850&r2=774851&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Thu May 14 17:40:59 2009
@@ -19,10 +19,14 @@
 
 package org.apache.cxf.workqueue;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -43,30 +47,44 @@
     
     WorkQueueManagerImpl manager;
     String name = "default";
-    
 
     public AutomaticWorkQueueImpl() {
         this(DEFAULT_MAX_QUEUE_SIZE);
     }    
+    public AutomaticWorkQueueImpl(String name) {
+        this(DEFAULT_MAX_QUEUE_SIZE, name);
+    }    
     public AutomaticWorkQueueImpl(int max) {
+        this(max, "default");
+    }
+    public AutomaticWorkQueueImpl(int max, String name) {
         this(max,
              0,
              25,
              5,
-             2 * 60 * 1000L);
+             2 * 60 * 1000L,
+             name);
     }
-    
     public AutomaticWorkQueueImpl(int mqs, 
                                   int initialThreads, 
                                   int highWaterMark, 
                                   int lowWaterMark,
                                   long dequeueTimeout) {
+        this(mqs, initialThreads, highWaterMark, lowWaterMark, dequeueTimeout, "default");
+    }    
+    public AutomaticWorkQueueImpl(int mqs, 
+                                  int initialThreads, 
+                                  int highWaterMark, 
+                                  int lowWaterMark,
+                                  long dequeueTimeout,
+                                  String name) {
         
         super(-1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark, 
             -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark,
                 TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), TimeUnit.MILLISECONDS, 
                 mqs == -1 ? new LinkedBlockingQueue<Runnable>(DEFAULT_MAX_QUEUE_SIZE)
-                    : new LinkedBlockingQueue<Runnable>(mqs));
+                    : new LinkedBlockingQueue<Runnable>(mqs),
+            createThreadFactory(name));
         
         maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs;
         
@@ -99,6 +117,66 @@
             setCorePoolSize(lowWaterMark);
         }
     }
+    private static ThreadFactory createThreadFactory(final String name) {
+        ThreadGroup group;
+        try { 
+            //Try and find the highest level ThreadGroup that we're allowed to use.
+            //That SHOULD allow the default classloader and thread locals and such 
+            //to be the least likely to cause issues down the road.
+            group = AccessController.doPrivileged(
+                new PrivilegedAction<ThreadGroup>() { 
+                    public ThreadGroup run() { 
+                        ThreadGroup group = Thread.currentThread().getThreadGroup(); 
+                        ThreadGroup parent = group;
+                        try { 
+                            while (parent != null) { 
+                                group = parent;  
+                                parent = parent.getParent(); 
+                            } 
+                        } catch (SecurityException se) {
+                            //ignore - if we get here, the "group" is as high as 
+                            //the security manager will allow us to go.   Use that one.
+                        }
+                        return new ThreadGroup(group, name + "-workqueue"); 
+                    } 
+                }
+            );
+        } catch (SecurityException e) { 
+            group = new ThreadGroup(name + "-workqueue"); 
+        }
+        
+        return new AWQThreadFactory(group, name);
+    }
+    static class AWQThreadFactory implements ThreadFactory {
+        final AtomicInteger threadNumber = new AtomicInteger(1);
+        ThreadGroup group;
+        String name;
+        ClassLoader loader;
+        AWQThreadFactory(ThreadGroup gp, String nm) {
+            group = gp;
+            name = nm;
+            //force the loader to be the loader of CXF, not the application loader
+            loader = AutomaticWorkQueueImpl.class.getClassLoader();
+        }
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, 
+                                  r, 
+                                  name + "-workqueue-" + threadNumber.getAndIncrement(),
+                                  0);
+            t.setContextClassLoader(loader);
+            if (t.isDaemon()) {
+                t.setDaemon(false);
+            }
+            if (t.getPriority() != Thread.NORM_PRIORITY) {
+                t.setPriority(Thread.NORM_PRIORITY);
+            }
+            return t;
+        }
+        public void setName(String s) {
+            name = s;
+        }
+    }
+    
     @Resource(name = "org.apache.cxf.workqueue.WorkQueueManager")
     public void setManager(WorkQueueManagerImpl mgr) {
         manager = mgr;
@@ -109,6 +187,10 @@
 
     public void setName(String s) {
         name = s;
+        ThreadFactory factory = this.getThreadFactory();
+        if (factory instanceof AWQThreadFactory) {
+            ((AWQThreadFactory)factory).setName(s);
+        }
     }
     public String getName() {
         return name;
@@ -148,8 +230,26 @@
         return buf.toString();
     }
     
+    public void execute(final Runnable command) {
+        //Grab the context classloader of this thread.   We'll make sure we use that 
+        //on the thread the runnable actually runs on.
+        
+        final ClassLoader loader = Thread.currentThread().getContextClassLoader();
+        Runnable r = new Runnable() {
+            public void run() {
+                ClassLoader orig = Thread.currentThread().getContextClassLoader();
+                try {
+                    Thread.currentThread().setContextClassLoader(loader);
+                    command.run();
+                } finally {
+                    Thread.currentThread().setContextClassLoader(orig);
+                }
+            }
+        };
+        super.execute(r);
+    }
+    
     // WorkQueue interface
-     
     public void execute(Runnable work, long timeout) {
         try {
             execute(work);

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java?rev=774851&r1=774850&r2=774851&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java Thu May 14 17:40:59 2009
@@ -118,7 +118,7 @@
     }
     
     private AutomaticWorkQueue createAutomaticWorkQueue() {        
-        AutomaticWorkQueueImpl impl = new AutomaticWorkQueueImpl();
+        AutomaticWorkQueueImpl impl = new AutomaticWorkQueueImpl("default");
         impl.setManager(this);
         impl.register();
         return impl;