You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2008/08/05 23:43:29 UTC

svn commit: r682970 - in /cxf/branches/2.0.x-fixes: ./ api/src/main/java/org/apache/cxf/workqueue/ common/common/src/main/java/org/apache/cxf/staxutils/ rt/core/src/main/java/org/apache/cxf/bus/ rt/core/src/main/java/org/apache/cxf/bus/spring/ rt/core/...

Author: dkulp
Date: Tue Aug  5 14:43:28 2008
New Revision: 682970

URL: http://svn.apache.org/viewvc?rev=682970&view=rev
Log:
Merged revisions 682902 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r682902 | dkulp | 2008-08-05 15:43:28 -0400 (Tue, 05 Aug 2008) | 9 lines
  
  [CXF-1734] Make the default workqueue configurable
  Allow for "named" workqueus
  Update jms transport to check for particular named workqueues (endpoint qname or "jms") 
  to configure on per endpoint or all other JMS basis
  Update extension checking in the Bus to search for extensions in the context that aren't 
  yet loaded. Stuff loaded before the IntrumentationManager were not getting properly 
  instrumented or registered.
........

Added:
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java
      - copied unchanged from r682902, cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java
Modified:
    cxf/branches/2.0.x-fixes/   (props changed)
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java
    cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/staxutils/StaxUtils.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/CXFBusImpl.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/spring/BusExtensionPostProcessor.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImplMBeanWrapper.java
    cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/jaxb/JAXBDataBinding.java
    cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/ServiceImplTest.java
    cxf/branches/2.0.x-fixes/rt/management/src/main/resources/META-INF/cxf/cxf-extension-management.xml
    cxf/branches/2.0.x-fixes/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
    cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/CountersClientServerTest.java
    cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedBusTest.java
    cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedClientServerTest.java
    cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/managed-spring.xml

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Aug  5 14:43:28 2008
@@ -1 +1 @@
-/cxf/trunk:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816
+/cxf/trunk:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java (original)
+++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java Tue Aug  5 14:43:28 2008
@@ -20,29 +20,24 @@
 
 public interface WorkQueueManager {
 
-    enum ThreadingModel {
-        SINGLE_THREADED, MULTI_THREADED
-    };
-
     /**
-     * Get the manager's work queue.
+     * Get the manager's default work queue.
      * @return AutomaticWorkQueue
      */
     AutomaticWorkQueue getAutomaticWorkQueue();
 
     /**
-     * Get the threading model.
-     * @return ThreadingModel - either <code>SINGLE_THREADED</code>
-     * or <code>MULTI_THREADED</code>.
+     * Get the named work queue.
+     * @return AutomaticWorkQueue
      */
-    ThreadingModel getThreadingModel();
-
+    AutomaticWorkQueue getNamedWorkQueue(String name);
+    
     /**
-     * Set the threading model.
-     * @param model either <code>SINGLE_THREADED</code>
-     * or <code>MULTI_THREADED</code>.
+     * Adds a named work queue
+     * @param name
+     * @param q
      */
-    void setThreadingModel(ThreadingModel model);
+    void addNamedWorkQueue(String name, AutomaticWorkQueue q);
     
     /**
      * Shuts down the manager's work queue. If

Modified: cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/staxutils/StaxUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/staxutils/StaxUtils.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/staxutils/StaxUtils.java (original)
+++ cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/staxutils/StaxUtils.java Tue Aug  5 14:43:28 2008
@@ -454,7 +454,7 @@
 
     /**
      * Writes an Element to an XMLStreamWriter. The writer must already have
-     * started the doucment (via writeStartDocument()). Also, this probably
+     * started the document (via writeStartDocument()). Also, this probably
      * won't work with just a fragment of a document. The Element should be the
      * root element of the document.
      * 
@@ -475,7 +475,7 @@
      * 
      * @param e
      * @param writer
-     * @param endElement true iff the element should be ended
+     * @param endElement true if the element should be ended
      * @throws XMLStreamException
      */
     public static void writeElement(Element e,

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/CXFBusImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/CXFBusImpl.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/CXFBusImpl.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/CXFBusImpl.java Tue Aug  5 14:43:28 2008
@@ -35,6 +35,7 @@
     private String id;
     private BusState state;      
     private Collection<AbstractFeature> features;
+    private ExtensionFinder finder;
     
     public CXFBusImpl() {
         this(null);
@@ -57,7 +58,9 @@
         this.state = state;
     }
     
-
+    public void setExtensionFinder(ExtensionFinder f) {
+        finder = f;
+    }
     
     public void setId(String i) {
         id = i;
@@ -65,6 +68,9 @@
 
     public final <T> T getExtension(Class<T> extensionType) {
         Object obj = extensions.get(extensionType);
+        if (obj == null && finder != null) {
+            obj = finder.findExtension(extensionType);
+        }
         if (null != obj) {
             return extensionType.cast(obj);
         }
@@ -126,7 +132,7 @@
             BusFactory.setDefaultBus(null);
         }
     }
-    
+
     protected BusState getState() {
         return state;
     }
@@ -143,4 +149,7 @@
         }
     }
     
+    public interface ExtensionFinder {
+        <T> T findExtension(Class<T> cls);
+    }
 }

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/spring/BusExtensionPostProcessor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/spring/BusExtensionPostProcessor.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/spring/BusExtensionPostProcessor.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/bus/spring/BusExtensionPostProcessor.java Tue Aug  5 14:43:28 2008
@@ -20,9 +20,11 @@
 package org.apache.cxf.bus.spring;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.bus.CXFBusImpl;
 import org.apache.cxf.extension.BusExtension;
 
 import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
@@ -58,6 +60,20 @@
     private Bus getBus() {
         if (bus == null) {
             bus = (Bus)context.getBean(Bus.DEFAULT_BUS_ID);
+            
+            final ApplicationContext ctx = context;
+            if (bus instanceof CXFBusImpl) {
+                CXFBusImpl b = (CXFBusImpl)bus;
+                b.setExtensionFinder(new CXFBusImpl.ExtensionFinder() {
+                    public <T> T findExtension(Class<T> cls) {
+                        try {
+                            return cls.cast(ctx.getBean(cls.getName(), cls));
+                        } catch (NoSuchBeanDefinitionException ex) {
+                            return null;
+                        }
+                    }
+                });
+            }
         }
         return bus;
     }

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Tue Aug  5 14:43:28 2008
@@ -19,33 +19,58 @@
 
 package org.apache.cxf.workqueue;
 
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import javax.management.JMException;
+
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.management.InstrumentationManager;
 
 public class AutomaticWorkQueueImpl extends ThreadPoolExecutor implements AutomaticWorkQueue {
 
-    static final int DEFAULT_MAX_QUEUE_SIZE = 128;
+    static final int DEFAULT_MAX_QUEUE_SIZE = 256;
     private static final Logger LOG =
         LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class);
     
     int maxQueueSize;
+    
+    WorkQueueManagerImpl manager;
+    String name = "default";
+    
 
-    AutomaticWorkQueueImpl(int mqs, int initialThreads, int highWaterMark, int lowWaterMark,
-                           long dequeueTimeout) {
+    public AutomaticWorkQueueImpl() {
+        this(DEFAULT_MAX_QUEUE_SIZE);
+    }    
+    public AutomaticWorkQueueImpl(int max) {
+        this(max,
+             0,
+             25,
+             5,
+             2 * 60 * 1000L);
+    }
+    
+    public AutomaticWorkQueueImpl(int mqs, 
+                                  int initialThreads, 
+                                  int highWaterMark, 
+                                  int lowWaterMark,
+                                  long dequeueTimeout) {
         
         super(-1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark, 
             -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark,
                 TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), TimeUnit.MILLISECONDS, 
-                mqs == -1 ? new ArrayBlockingQueue<Runnable>(DEFAULT_MAX_QUEUE_SIZE)
-                    : new ArrayBlockingQueue<Runnable>(mqs));
+                mqs == -1 ? new LinkedBlockingQueue<Runnable>(DEFAULT_MAX_QUEUE_SIZE)
+                    : new LinkedBlockingQueue<Runnable>(mqs));
         
         maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs;
+        
+        
         lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark;
         highWaterMark = -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark;
                 
@@ -74,7 +99,36 @@
             setCorePoolSize(lowWaterMark);
         }
     }
+    @Resource(name = "org.apache.cxf.workqueue.WorkQueueManager")
+    public void setManager(WorkQueueManagerImpl mgr) {
+        manager = mgr;
+    }
+    public WorkQueueManager getManager() {
+        return manager;
+    }
 
+    public void setName(String s) {
+        name = s;
+    }
+    public String getName() {
+        return name;
+    }
+    
+    @PostConstruct
+    public void register() {
+        if (manager != null) {
+            manager.addNamedWorkQueue(name, this);
+            InstrumentationManager imanager = manager.getBus().getExtension(InstrumentationManager.class);
+            if (null != imanager) {
+                try {
+                    imanager.register(new WorkQueueImplMBeanWrapper(this));
+                } catch (JMException jmex) {
+                    LOG.log(Level.WARNING , jmex.getMessage(), jmex);
+                }
+            }
+        }
+    }
+    
     public String toString() {
         StringBuffer buf = new StringBuffer();
         buf.append(super.toString());
@@ -136,7 +190,7 @@
      * Gets the maximum size (capacity) of the backing queue.
      * @return the maximum size (capacity) of the backing queue.
      */
-    long getMaxSize() {
+    public long getMaxSize() {
         return maxQueueSize;
     }
 
@@ -157,21 +211,21 @@
         return getQueue().remainingCapacity() == 0;
     }
 
-    int getHighWaterMark() {
+    public int getHighWaterMark() {
         int hwm = getMaximumPoolSize();
         return hwm == Integer.MAX_VALUE ? -1 : hwm;
     }
 
-    int getLowWaterMark() {
+    public int getLowWaterMark() {
         int lwm = getCorePoolSize();
         return lwm == Integer.MAX_VALUE ? -1 : lwm;
     }
 
-    void setHighWaterMark(int hwm) {
+    public void setHighWaterMark(int hwm) {
         setMaximumPoolSize(hwm < 0 ? Integer.MAX_VALUE : hwm);
     }
 
-    void setLowWaterMark(int lwm) {
+    public void setLowWaterMark(int lwm) {
         setCorePoolSize(lwm < 0 ? 0 : lwm);
     }
 }

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java Tue Aug  5 14:43:28 2008
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.workqueue;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -35,8 +37,8 @@
     private static final Logger LOG =
         LogUtils.getL7dLogger(WorkQueueManagerImpl.class);
 
-    ThreadingModel threadingModel = ThreadingModel.MULTI_THREADED;
-    AutomaticWorkQueue autoQueue;
+    Map<String, AutomaticWorkQueue> namedQueues 
+        = new ConcurrentHashMap<String, AutomaticWorkQueue>();
     boolean inShutdown;
     Bus bus;  
     
@@ -53,12 +55,6 @@
     public void register() {
         if (null != bus) {
             bus.setExtension(this, WorkQueueManager.class);
-        }
-    }
-
-    public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
-        if (autoQueue == null) {
-            autoQueue = createAutomaticWorkQueue();
             InstrumentationManager manager = bus.getExtension(InstrumentationManager.class);
             if (null != manager) {
                 try {
@@ -68,22 +64,20 @@
                 }
             }
         }
-        
-        return autoQueue;
-    }
-
-    public ThreadingModel getThreadingModel() {
-        return threadingModel;
     }
 
-    public void setThreadingModel(ThreadingModel model) {
-        threadingModel = model;
+    public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
+        AutomaticWorkQueue defaultQueue = getNamedWorkQueue("default");
+        if (defaultQueue == null) {
+            defaultQueue = createAutomaticWorkQueue();
+        }
+        return defaultQueue;
     }
 
     public synchronized void shutdown(boolean processRemainingTasks) {
         inShutdown = true;
-        if (autoQueue != null) {
-            autoQueue.shutdown(processRemainingTasks);
+        for (AutomaticWorkQueue q : namedQueues.values()) {
+            q.shutdown(processRemainingTasks);
         }
 
         synchronized (this) {
@@ -100,11 +94,13 @@
                     // ignore
                 }
             }
-            while (autoQueue != null && !autoQueue.isShutdown()) {
-                try {            
-                    Thread.sleep(100);
-                } catch (InterruptedException ex) {
-                    // ignore
+            for (AutomaticWorkQueue q : namedQueues.values()) {
+                while (!q.isShutdown()) {
+                    try {            
+                        Thread.sleep(100);
+                    } catch (InterruptedException ex) {
+                        // ignore
+                    }
                 }
             }
         }
@@ -114,27 +110,18 @@
         
     }
 
-    private AutomaticWorkQueue createAutomaticWorkQueue() {        
-      
-        // Configuration configuration = bus.getConfiguration();
-
-        // configuration.getInteger("threadpool:initial_threads");
-        int initialThreads = 1;
-
-        // int lwm = configuration.getInteger("threadpool:low_water_mark");
-        int lwm = 5;
-
-        // int hwm = configuration.getInteger("threadpool:high_water_mark");
-        int hwm = 25;
-
-        // configuration.getInteger("threadpool:max_queue_size");
-        int maxQueueSize = 10 * hwm;
-
-        // configuration.getInteger("threadpool:dequeue_timeout");
-        long dequeueTimeout = 2 * 60 * 1000L;
-
-        return new AutomaticWorkQueueImpl(maxQueueSize, initialThreads, hwm, lwm, dequeueTimeout);
-               
+    public AutomaticWorkQueue getNamedWorkQueue(String name) {
+        return namedQueues.get(name);
+    }
+    public void addNamedWorkQueue(String name, AutomaticWorkQueue q) {
+        namedQueues.put(name, q);
     }
     
+    private AutomaticWorkQueue createAutomaticWorkQueue() {        
+        AutomaticWorkQueueImpl impl = new AutomaticWorkQueueImpl();
+        impl.setManager(this);
+        impl.register();
+        return impl;       
+    }
+
 }

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImplMBeanWrapper.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImplMBeanWrapper.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImplMBeanWrapper.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImplMBeanWrapper.java Tue Aug  5 14:43:28 2008
@@ -25,97 +25,30 @@
 import org.apache.cxf.Bus;
 import org.apache.cxf.management.ManagedComponent;
 import org.apache.cxf.management.ManagementConstants;
-import org.apache.cxf.management.annotation.ManagedAttribute;
 import org.apache.cxf.management.annotation.ManagedOperation;
 import org.apache.cxf.management.annotation.ManagedResource;
-import org.apache.cxf.workqueue.WorkQueueManager.ThreadingModel;
 
-@ManagedResource(componentName = "WorkQueue", 
-                 description = "The CXF internal thread pool for manangement ", 
+@ManagedResource(componentName = "WorkQueueManager", 
+                 description = "The CXF manangement of work queues ", 
                  currencyTimeLimit = 15, persistPolicy = "OnUpdate", persistPeriod = 200)
                  
 public class WorkQueueManagerImplMBeanWrapper implements ManagedComponent {    
-    private static final String NAME_VALUE = "Bus.WorkQueue";
-    private static final String TYPE_VALUE = "WorkQueueMBean";
+    static final String NAME_VALUE = "Bus.WorkQueueManager";
+    static final String TYPE_VALUE = "WorkQueueManagerMBean";
     
     private WorkQueueManagerImpl wqManager;
-    private AutomaticWorkQueueImpl aWorkQueue;
     private Bus bus;
     
     public WorkQueueManagerImplMBeanWrapper(WorkQueueManagerImpl wq) {
         wqManager = wq;        
         bus = wq.getBus();
-        if (wqManager.autoQueue != null 
-            && AutomaticWorkQueueImpl.class.isAssignableFrom(wqManager.autoQueue.getClass())) {
-            aWorkQueue = (AutomaticWorkQueueImpl)wqManager.autoQueue;
-        }
     }
-   
     
     @ManagedOperation(currencyTimeLimit = 30)
     public void shutdown(boolean processRemainingWorkItems) {
         wqManager.shutdown(processRemainingWorkItems); 
     }
-    
-    @ManagedAttribute(description = "The thread pool work model",                      
-                      defaultValue = "SINGLE_THREADED",
-                      persistPolicy = "OnUpdate")
-                      
-    public String getThreadingModel() {        
-        return wqManager.getThreadingModel().toString();
-    }
-
-    public void setThreadingModel(String model) {
-        if (model.compareTo("SINGLE_THREADED") == 0) {
-            wqManager.setThreadingModel(ThreadingModel.SINGLE_THREADED);
-        }
-        if (model.compareTo("MULTI_THREADED") == 0) {
-            wqManager.setThreadingModel(ThreadingModel.MULTI_THREADED);
-        }             
-    }
-   
-    @ManagedAttribute(description = "The WorkQueueMaxSize",
-                      persistPolicy = "OnUpdate")
-    public long getWorkQueueMaxSize() {
-        return aWorkQueue.getMaxSize();
-    }
-   
-    @ManagedAttribute(description = "The WorkQueue Current size",
-                      persistPolicy = "OnUpdate")
-    public long getWorkQueueSize() {
-        return aWorkQueue.getSize();
-    }
-
-    @ManagedAttribute(description = "The WorkQueue has nothing to do",
-                      persistPolicy = "OnUpdate")
-    public boolean isEmpty() {
-        return aWorkQueue.isEmpty();
-    }
-
-    @ManagedAttribute(description = "The WorkQueue is very busy")
-    public boolean isFull() {
-        return aWorkQueue.isFull();
-    }
-
-    @ManagedAttribute(description = "The WorkQueue HighWaterMark",
-                      persistPolicy = "OnUpdate")
-    public int getHighWaterMark() {
-        return aWorkQueue.getHighWaterMark();
-    }
-    public void setHighWaterMark(int hwm) {
-        aWorkQueue.setHighWaterMark(hwm);
-    }
-
-    @ManagedAttribute(description = "The WorkQueue LowWaterMark",
-                      persistPolicy = "OnUpdate")
-    public int getLowWaterMark() {
-        return aWorkQueue.getLowWaterMark();
-    }
-
-    public void setLowWaterMark(int lwm) {
-        aWorkQueue.setLowWaterMark(lwm);
-    }
-
+      
     public ObjectName getObjectName() throws JMException {
         
         String busId = bus.getId();        

Modified: cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/jaxb/JAXBDataBinding.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/jaxb/JAXBDataBinding.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/jaxb/JAXBDataBinding.java (original)
+++ cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/jaxb/JAXBDataBinding.java Tue Aug  5 14:43:28 2008
@@ -246,7 +246,6 @@
             return;
         }
 
-        CachedContextAndSchemas cachedContextAndSchemas = null;
 
         contextClasses = new LinkedHashSet<Class<?>>();
         for (ServiceInfo serviceInfo : service.getServiceInfos()) {
@@ -260,35 +259,37 @@
         }
 
         String tns = service.getName().getNamespaceURI();
+        CachedContextAndSchemas cachedContextAndSchemas = null;
         JAXBContext ctx = null;
         try {
             if (service.getServiceInfos().size() > 0) {
                 tns = service.getServiceInfos().get(0).getInterface().getName().getNamespaceURI();
             }
-            ctx = createJAXBContext(contextClasses, tns);
-            cachedContextAndSchemas = JAXBCONTEXT_CACHE.get(contextClasses);
+            cachedContextAndSchemas = createJAXBContextAndSchemas(contextClasses, tns);
         } catch (JAXBException e1) {
             // load jaxb needed class and try to create jaxb context for more
             // times
             boolean added = addJaxbObjectFactory(e1);
-            while (ctx == null && added) {
+            while (cachedContextAndSchemas == null && added) {
                 try {
-                    synchronized (JAXBCONTEXT_CACHE) {
-                        ctx = JAXBContext.newInstance(contextClasses
-                            .toArray(new Class[contextClasses.size()]), null);
-                        cachedContextAndSchemas = new CachedContextAndSchemas(ctx);
-                        JAXBCONTEXT_CACHE.put(contextClasses, cachedContextAndSchemas);
-                    }
+                    ctx = JAXBContext.newInstance(contextClasses
+                                                  .toArray(new Class[contextClasses.size()]), null);
+                    cachedContextAndSchemas = new CachedContextAndSchemas(ctx);
                 } catch (JAXBException e) {
                     e1 = e;
                     added = addJaxbObjectFactory(e1);
                 }
             }
+
             if (ctx == null) {
                 throw new ServiceConstructionException(e1);
+            } else {
+                synchronized (JAXBCONTEXT_CACHE) {
+                    JAXBCONTEXT_CACHE.put(contextClasses, cachedContextAndSchemas);
+                }                
             }
         }
-
+        ctx = cachedContextAndSchemas.getContext();
         if (LOG.isLoggable(Level.FINE)) {
             LOG.log(Level.FINE, "CREATED_JAXB_CONTEXT", new Object[] {ctx, contextClasses});
         }
@@ -394,6 +395,13 @@
     }
 
     public JAXBContext createJAXBContext(Set<Class<?>> classes, String defaultNs) throws JAXBException {
+        return createJAXBContextAndSchemas(classes, defaultNs).getContext();
+    }
+    
+    public CachedContextAndSchemas createJAXBContextAndSchemas(Set<Class<?>> classes,
+                                                               String defaultNs) 
+        throws JAXBException {
+        
         // add user extra class into jaxb context
         if (extraClass != null && extraClass.length > 0) {
             for (Class clz : extraClass) {
@@ -502,15 +510,17 @@
 
         CachedContextAndSchemas cachedContextAndSchemas = null;
         synchronized (JAXBCONTEXT_CACHE) {
-            if (!JAXBCONTEXT_CACHE.containsKey(classes)) {
-                JAXBContext ctx = JAXBContext.newInstance(classes.toArray(new Class[classes.size()]), map);
-                cachedContextAndSchemas = new CachedContextAndSchemas(ctx);
+            cachedContextAndSchemas = JAXBCONTEXT_CACHE.get(classes);
+        }
+        if (cachedContextAndSchemas == null) {
+            JAXBContext ctx = JAXBContext.newInstance(classes.toArray(new Class[classes.size()]), map);
+            cachedContextAndSchemas = new CachedContextAndSchemas(ctx);
+            synchronized (JAXBCONTEXT_CACHE) {
                 JAXBCONTEXT_CACHE.put(classes, cachedContextAndSchemas);
             }
-            cachedContextAndSchemas = JAXBCONTEXT_CACHE.get(classes);
         }
 
-        return cachedContextAndSchemas.getContext();
+        return cachedContextAndSchemas;
     }
 
     private void addToObjectFactoryCache(Package objectFactoryPkg, Class<?> ofactory) {

Modified: cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/ServiceImplTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/ServiceImplTest.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/ServiceImplTest.java (original)
+++ cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/ServiceImplTest.java Tue Aug  5 14:43:28 2008
@@ -173,6 +173,9 @@
     
     @Test
     public void testJAXBCachedOnRepeatGetPort() {
+        System.gc();
+        System.gc();
+        
         URL wsdl1 = getClass().getResource("/wsdl/calculator.wsdl");
         assertNotNull(wsdl1);
         

Modified: cxf/branches/2.0.x-fixes/rt/management/src/main/resources/META-INF/cxf/cxf-extension-management.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/management/src/main/resources/META-INF/cxf/cxf-extension-management.xml?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/management/src/main/resources/META-INF/cxf/cxf-extension-management.xml (original)
+++ cxf/branches/2.0.x-fixes/rt/management/src/main/resources/META-INF/cxf/cxf-extension-management.xml Tue Aug  5 14:43:28 2008
@@ -23,7 +23,7 @@
        xsi:schemaLocation="
 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
         
-    <bean id="org.apache.cxf.management.jmx.InstrumentationManagerImpl"
+    <bean id="org.apache.cxf.management.InstrumentationManager"
 	  class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
         <property name="bus" ref="cxf"/>
         <property name="enabled" value="false"/>

Modified: cxf/branches/2.0.x-fixes/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java (original)
+++ cxf/branches/2.0.x-fixes/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java Tue Aug  5 14:43:28 2008
@@ -79,7 +79,7 @@
             ObjectName n = (ObjectName)it.next();            
             Long result = 
                 (Long)mbs.invoke(n, "getWorkQueueMaxSize", new Object[0], new String[0]);            
-            assertEquals(result, Long.valueOf(250));
+            assertEquals(result, Long.valueOf(256));
         }
 
         bus.shutdown(true);

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Aug  5 14:43:28 2008
@@ -41,6 +41,7 @@
 import javax.jms.QueueSender;
 import javax.jms.TextMessage;
 import javax.naming.NamingException;
+import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -113,7 +114,9 @@
             JMSProviderHub.connect(this, serverConfig, runtimePolicy);
             //Get a non-pooled session. 
             listenerSession = base.sessionFactory.get(base.targetDestination);
-            listenerThread = new JMSListenerThread(listenerSession);
+            listenerThread = new JMSListenerThread(listenerSession,
+                                                   getEndpointInfo() == null ? null 
+                                                       : getEndpointInfo().getName());
             listenerThread.start();
         } catch (JMSException ex) {
             getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex);
@@ -269,13 +272,31 @@
     
     protected class JMSListenerThread extends Thread {
         private final PooledSession listenSession;
-
-        public JMSListenerThread(PooledSession session) {
+        private final QName name;
+        public JMSListenerThread(PooledSession session, QName n) {
             listenSession = session;
+            name = n;
         }
 
         public void run() {
             try {
+                Executor executor = null;
+                if (executor == null) {
+                    WorkQueueManager wqm =
+                        base.bus.getExtension(WorkQueueManager.class);
+                    if (null != wqm) {
+                        if (name != null) {
+                            executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" 
+                                                             + name.getLocalPart());
+                        }                        
+                        if (executor == null) {
+                            executor = wqm.getNamedWorkQueue("jms");
+                        }
+                        if (executor == null) {
+                            executor = wqm.getAutomaticWorkQueue();
+                        }
+                    }    
+                }
                 while (true) {
                     javax.jms.Message message = listenSession.consumer().receive();                   
                     if (message == null) {
@@ -287,14 +308,6 @@
                     while (message != null) {
                         //REVISIT  to get the thread pool                        
                         //Executor executor = jmsDestination.callback.getExecutor();
-                        Executor executor = null;
-                        if (executor == null) {
-                            WorkQueueManager wqm =
-                                base.bus.getExtension(WorkQueueManager.class);
-                            if (null != wqm) {
-                                executor = wqm.getAutomaticWorkQueue();
-                            }    
-                        }
                         if (executor != null) {
                             try {
                                 executor.execute(new JMSExecutor(message));

Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Tue Aug  5 14:43:28 2008
@@ -494,10 +494,10 @@
                 WorkQueueManager workQueueManager =
                     bus.getExtension(WorkQueueManager.class);
                 Executor autoWorkQueue =
-                    workQueueManager.getAutomaticWorkQueue();
+                    workQueueManager.getNamedWorkQueue("ws-addressing");
                 executor = autoWorkQueue != null
                            ? autoWorkQueue
-                           : OneShotAsyncExecutor.getInstance();
+                           :  workQueueManager.getAutomaticWorkQueue();
             } else {
                 executor = OneShotAsyncExecutor.getInstance();
             }

Modified: cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/CountersClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/CountersClientServerTest.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/CountersClientServerTest.java (original)
+++ cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/CountersClientServerTest.java Tue Aug  5 14:43:28 2008
@@ -80,7 +80,7 @@
     
     @AfterClass
     public static void shutdownBus() throws Exception {
-        BusFactory.getDefaultBus().shutdown(false);
+        BusFactory.getDefaultBus().shutdown(true);
     }
     
     @Test
@@ -113,7 +113,8 @@
         
         assertEquals("The Counters are not create yet", 4, cr.getCounters().size());
         Set counterNames = mbs.queryNames(name, null);
-        assertEquals("The Counters are not export to JMX ", 4 + 2 , counterNames.size());
+        assertEquals("The Counters are not export to JMX: " + counterNames, 
+                     4 + 3 , counterNames.size());
        
         ObjectName sayHiCounter =  new ObjectName(ManagementConstants.DEFAULT_DOMAIN_NAME 
             + ":operation=\"{http://apache.org/hello_world_soap_http}sayHi\",*"); 
@@ -143,7 +144,7 @@
         greeter.greetMeOneWay("hello");
         assertEquals("The Counters are not create yet", 6, cr.getCounters().size());
         counterNames = mbs.queryNames(name, null);
-        assertEquals("The Counters are not export to JMX ", 6 + 2, counterNames.size());
+        assertEquals("The Counters are not export to JMX ", 6 + 3, counterNames.size());
         
         ObjectName greetMeOneWayCounter =  new ObjectName(ManagementConstants.DEFAULT_DOMAIN_NAME 
             + ":operation=\"{http://apache.org/hello_world_soap_http}greetMeOneWay\",*");

Modified: cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedBusTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedBusTest.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedBusTest.java (original)
+++ cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedBusTest.java Tue Aug  5 14:43:28 2008
@@ -27,6 +27,7 @@
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.management.InstrumentationManager;
 import org.apache.cxf.management.ManagementConstants;
 import org.apache.cxf.management.jmx.InstrumentationManagerImpl;
@@ -66,19 +67,46 @@
         assertNotNull(imi.getMBeanServer());
 
         WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
-        manager.getAutomaticWorkQueue();
-        
-        MBeanServer mbs = im.getMBeanServer();        
+                
+        MBeanServer mbs = im.getMBeanServer();      
         ObjectName name = new ObjectName(ManagementConstants.DEFAULT_DOMAIN_NAME 
-                                         + ":type=WorkQueueMBean,*");
+                                         + ":type=WorkQueueManagerMBean,*");
         Set s = mbs.queryNames(name, null);
-        assertTrue(s.size() == 1);
+        StringBuilder b = new StringBuilder();
+        for (ObjectName o : CastUtils.cast(s, ObjectName.class)) {
+            b.append(o.toString());
+            b.append("\n");
+        }
+        assertEquals("Size is wrong: " + b.toString(), 1, s.size());
+        
+        assertNotNull(manager.getNamedWorkQueue("testQueue"));
+        manager.getAutomaticWorkQueue();
+
+        name = new ObjectName(ManagementConstants.DEFAULT_DOMAIN_NAME 
+                             + ":type=WorkQueueMBean,*");
+        s = mbs.queryNames(name, null);
+        b = new StringBuilder();
+        for (ObjectName o : CastUtils.cast(s, ObjectName.class)) {
+            b.append(o.toString());
+            b.append("\n");
+        }
+        assertEquals("Size is wrong: " + b.toString(), 2, s.size());
+        
         Iterator it = s.iterator();
         while (it.hasNext()) {
             ObjectName n = (ObjectName)it.next();            
             Long result = 
-                (Long)mbs.invoke(n, "getWorkQueueMaxSize", new Object[0], new String[0]);            
-            assertEquals(result, Long.valueOf(250));
+                (Long)mbs.invoke(n, "getWorkQueueMaxSize", new Object[0], new String[0]);
+            assertEquals(result, Long.valueOf(256));                
+
+            Integer hwm = 
+                (Integer)mbs.invoke(n, "getHighWaterMark", new Object[0], new String[0]);
+
+            if (n.toString().contains("testQueue")) {
+                assertEquals(hwm, Integer.valueOf(50));
+            } else {
+                assertEquals(hwm, Integer.valueOf(25));
+            }
         }
 
         name = new ObjectName(ManagementConstants.DEFAULT_DOMAIN_NAME 
@@ -93,6 +121,6 @@
             mbs.invoke(n, "shutdown", params, sig);            
         }        
         
-        bus.shutdown(false);
+        bus.shutdown(true);
     }
 }

Modified: cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedClientServerTest.java?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedClientServerTest.java (original)
+++ cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/ManagedClientServerTest.java Tue Aug  5 14:43:28 2008
@@ -94,7 +94,7 @@
         ObjectName name = new ObjectName(ManagementConstants.DEFAULT_DOMAIN_NAME
                                          + ":type=Bus.Service.Endpoint,*");
         Set s = mbs.queryNames(name, null);
-        assertTrue(s.size() == 1);
+        assertEquals(1, s.size());
         name = (ObjectName)s.iterator().next();
 
         Object val = mbs.invoke(name, "getState", new Object[0], new String[0]);

Modified: cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/managed-spring.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/managed-spring.xml?rev=682970&r1=682969&r2=682970&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/managed-spring.xml (original)
+++ cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/management/managed-spring.xml Tue Aug  5 14:43:28 2008
@@ -22,10 +22,15 @@
        xsi:schemaLocation="
 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
   
-    <bean id="org.apache.cxf.management.InstrumentationManager" class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
+    <bean id="org.apache.cxf.management.InstrumentationManager"
+        class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
         <property name="bus" ref="cxf" />
         <property name="enabled" value="true" />
         <property name="JMXServiceURL" value="service:jmx:rmi:///jndi/rmi://localhost:9916/jmxrmi" />
     </bean> 
  
+    <bean id="wq" class="org.apache.cxf.workqueue.AutomaticWorkQueueImpl">
+        <property name="name" value="testQueue"/>
+        <property name="highWaterMark" value="50"/>
+    </bean>
 </beans>
\ No newline at end of file