You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2007/12/05 22:27:23 UTC

svn commit: r601535 - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/ rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/java/org/apache/cxf/interceptor/ rt/core/src/...

Author: dkulp
Date: Wed Dec  5 13:27:05 2007
New Revision: 601535

URL: http://svn.apache.org/viewvc?rev=601535&view=rev
Log:
[CXF-1264] Make sure the ThreadDefaultBus is propery set when doing dispatches of messages
so that handlers, interceptors, services, etc.... that may create a new client or 
similar will get the correct bus.

Also, ThreadLocals are thread safe, we don't need to synchronize to access them.

Modified:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java
    incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java
    incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java
    incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
    incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java

Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java Wed Dec  5 13:27:05 2007
@@ -85,7 +85,7 @@
      * Sets the default bus for the thread.
      * @param bus the default bus.
      */
-    public static synchronized void setThreadDefaultBus(Bus bus) {
+    public static void setThreadDefaultBus(Bus bus) {
         localBus.set(bus);
     }
     
@@ -93,9 +93,17 @@
      * Gets the default bus for the thread.
      * @return the default bus.
      */
-    public static synchronized Bus getThreadDefaultBus() {
-        if (localBus.get() == null) {
-            Bus b = getDefaultBus();
+    public static Bus getThreadDefaultBus() {
+        return getThreadDefaultBus(true);
+    }
+    /**
+     * Gets the default bus for the thread, creating if needed
+     * @param createIfNeeded Set to true to create a default bus if one doesn't exist
+     * @return the default bus.
+     */
+    public static Bus getThreadDefaultBus(boolean createIfNeeded) {
+        if (createIfNeeded && localBus.get() == null) {
+            Bus b = getDefaultBus(createIfNeeded);
             localBus.set(b);
         }
         return localBus.get();

Modified: incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java (original)
+++ incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java Wed Dec  5 13:27:05 2007
@@ -28,6 +28,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
@@ -55,44 +56,50 @@
     }
 
     public void onMessage(Message m) {
-        if (LOG.isLoggable(Level.FINER)) {
-            LOG.finer("Processing Message at collocated endpoint.  Request message: " + m);
-        }
-        Exchange ex = new ExchangeImpl();
-        setExchangeProperties(ex, m);
-        
-        Message inMsg = endpoint.getBinding().createMessage();
-        MessageImpl.copyContent(m, inMsg);
-        
-        //Copy Request Context to Server inBound Message
-        //TODO a Context Filter Strategy required. 
-        inMsg.putAll(m);
-
-        inMsg.put(COLOCATED, Boolean.TRUE);
-        inMsg.put(Message.REQUESTOR_ROLE, Boolean.FALSE);
-        inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
-        OperationInfo oi = ex.get(OperationInfo.class);
-        if (oi != null) {
-            inMsg.put(MessageInfo.class, oi.getInput());
-        }
-        ex.setInMessage(inMsg);
-        inMsg.setExchange(ex);
-        
-        if (LOG.isLoggable(Level.FINEST)) {
-            LOG.finest("Build inbound interceptor chain.");
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try {
+            if (LOG.isLoggable(Level.FINER)) {
+                LOG.finer("Processing Message at collocated endpoint.  Request message: " + m);
+            }
+            Exchange ex = new ExchangeImpl();
+            setExchangeProperties(ex, m);
+            
+            Message inMsg = endpoint.getBinding().createMessage();
+            MessageImpl.copyContent(m, inMsg);
+            
+            //Copy Request Context to Server inBound Message
+            //TODO a Context Filter Strategy required. 
+            inMsg.putAll(m);
+    
+            inMsg.put(COLOCATED, Boolean.TRUE);
+            inMsg.put(Message.REQUESTOR_ROLE, Boolean.FALSE);
+            inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
+            OperationInfo oi = ex.get(OperationInfo.class);
+            if (oi != null) {
+                inMsg.put(MessageInfo.class, oi.getInput());
+            }
+            ex.setInMessage(inMsg);
+            inMsg.setExchange(ex);
+            
+            if (LOG.isLoggable(Level.FINEST)) {
+                LOG.finest("Build inbound interceptor chain.");
+            }
+    
+            //Add all interceptors between USER_LOGICAL and INVOKE.
+            SortedSet<Phase> phases = new TreeSet<Phase>(bus.getExtension(PhaseManager.class).getInPhases());
+            ColocUtil.setPhases(phases, Phase.USER_LOGICAL, Phase.INVOKE);
+            InterceptorChain chain = ColocUtil.getInInterceptorChain(ex, phases);
+            chain.add(addColocInterceptors());
+            inMsg.setInterceptorChain(chain);
+    
+            chain.doIntercept(inMsg);
+    
+            //Set Server OutBound Message onto InBound Exchange.
+            setOutBoundMessage(ex, m.getExchange());
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
         }
-
-        //Add all interceptors between USER_LOGICAL and INVOKE.
-        SortedSet<Phase> phases = new TreeSet<Phase>(bus.getExtension(PhaseManager.class).getInPhases());
-        ColocUtil.setPhases(phases, Phase.USER_LOGICAL, Phase.INVOKE);
-        InterceptorChain chain = ColocUtil.getInInterceptorChain(ex, phases);
-        chain.add(addColocInterceptors());
-        inMsg.setInterceptorChain(chain);
-
-        chain.doIntercept(inMsg);
-
-        //Set Server OutBound Message onto InBound Exchange.
-        setOutBoundMessage(ex, m.getExchange());
     }
     
     protected void setOutBoundMessage(Exchange from, Exchange to) {

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Wed Dec  5 13:27:05 2007
@@ -226,101 +226,109 @@
                            Object[] params, 
                            Map<String, Object> context,
                            Exchange exchange) throws Exception {
-        if (exchange == null) {
-            exchange = new ExchangeImpl();
-        }
-        Endpoint endpoint = getEndpoint();
-
-        Map<String, Object> requestContext = null;
-        Map<String, Object> responseContext = null;
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine("Invoke, operation info: " + oi + ", params: " + params);
-        }
-        Message message = endpoint.getBinding().createMessage();
-        if (null != context) {
-            requestContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT));
-            responseContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT));
-            message.put(Message.INVOCATION_CONTEXT, context);
-        }    
-        //setup the message context
-        setContext(requestContext, message);
-        setParameters(params, message);
-
-        if (null != requestContext) {
-            exchange.putAll(requestContext);
-        }
-        exchange.setOneWay(oi.getOutput() == null);
-
-        exchange.setOutMessage(message);
-        
-        setOutMessageProperties(message, oi);
-        setExchangeProperties(exchange, endpoint, oi);
-        
-        // setup chain
-
-        PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
-        message.setInterceptorChain(chain);
-        
-        modifyChain(chain, requestContext);
-        chain.setFaultObserver(outFaultObserver);
-        
-        // setup conduit selector
-        prepareConduitSelector(message);
-        
-        // execute chain        
-        chain.doIntercept(message);
-
+                
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try {
         
-        // Check to see if there is a Fault from the outgoing chain
-        Exception ex = message.getContent(Exception.class);
-        boolean mepCompleteCalled = false;
-        if (ex != null) {
-            getConduitSelector().complete(exchange);
-            mepCompleteCalled = true;
-            if (message.getContent(Exception.class) != null) {
-                throw ex;
+            if (exchange == null) {
+                exchange = new ExchangeImpl();
             }
-        }
-        ex = message.getExchange().get(Exception.class);
-        if (ex != null) {
-            if (!mepCompleteCalled) {
+            Endpoint endpoint = getEndpoint();
+    
+            Map<String, Object> requestContext = null;
+            Map<String, Object> responseContext = null;
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Invoke, operation info: " + oi + ", params: " + params);
+            }
+            Message message = endpoint.getBinding().createMessage();
+            if (null != context) {
+                requestContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT));
+                responseContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT));
+                message.put(Message.INVOCATION_CONTEXT, context);
+            }    
+            //setup the message context
+            setContext(requestContext, message);
+            setParameters(params, message);
+    
+            if (null != requestContext) {
+                exchange.putAll(requestContext);
+            }
+            exchange.setOneWay(oi.getOutput() == null);
+    
+            exchange.setOutMessage(message);
+            
+            setOutMessageProperties(message, oi);
+            setExchangeProperties(exchange, endpoint, oi);
+            
+            // setup chain
+    
+            PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
+            message.setInterceptorChain(chain);
+            
+            modifyChain(chain, requestContext);
+            chain.setFaultObserver(outFaultObserver);
+            
+            // setup conduit selector
+            prepareConduitSelector(message);
+            
+            // execute chain        
+            chain.doIntercept(message);
+    
+            
+            // Check to see if there is a Fault from the outgoing chain
+            Exception ex = message.getContent(Exception.class);
+            boolean mepCompleteCalled = false;
+            if (ex != null) {
                 getConduitSelector().complete(exchange);
+                mepCompleteCalled = true;
+                if (message.getContent(Exception.class) != null) {
+                    throw ex;
+                }
             }
-            throw ex;
-        }
-        
-        // Wait for a response if we need to
-        if (!oi.getOperationInfo().isOneWay()) {
-            synchronized (exchange) {
-                waitResponse(exchange);
+            ex = message.getExchange().get(Exception.class);
+            if (ex != null) {
+                if (!mepCompleteCalled) {
+                    getConduitSelector().complete(exchange);
+                }
+                throw ex;
             }
-        }
-        getConduitSelector().complete(exchange);
-
-        // Grab the response objects if there are any
-        List resList = null;
-        Message inMsg = exchange.getInMessage();
-        if (inMsg != null) {
-            if (null != responseContext) {                   
-                responseContext.putAll(inMsg);
-                if (LOG.isLoggable(Level.FINE)) {
-                    LOG.fine("set responseContext to be" + responseContext);
+            
+            // Wait for a response if we need to
+            if (!oi.getOperationInfo().isOneWay()) {
+                synchronized (exchange) {
+                    waitResponse(exchange);
                 }
             }
-            resList = inMsg.getContent(List.class);
-        }
-        
-        // check for an incoming fault
-        ex = getException(exchange);
-        
-        if (ex != null) {
-            throw ex;
-        }
-        
-        if (resList != null) {
-            return resList.toArray();
+            getConduitSelector().complete(exchange);
+    
+            // Grab the response objects if there are any
+            List resList = null;
+            Message inMsg = exchange.getInMessage();
+            if (inMsg != null) {
+                if (null != responseContext) {                   
+                    responseContext.putAll(inMsg);
+                    if (LOG.isLoggable(Level.FINE)) {
+                        LOG.fine("set responseContext to be" + responseContext);
+                    }
+                }
+                resList = inMsg.getContent(List.class);
+            }
+            
+            // check for an incoming fault
+            ex = getException(exchange);
+            
+            if (ex != null) {
+                throw ex;
+            }
+            
+            if (resList != null) {
+                return resList.toArray();
+            }
+            return null;
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
         }
-        return null;
     }
 
     protected Exception getException(Exchange exchange) {
@@ -405,6 +413,8 @@
         
         chain.setFaultObserver(outFaultObserver);
         
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
         // execute chain
         try {
             String startingAfterInterceptorID = (String) message.get(
@@ -426,6 +436,7 @@
                     message.getExchange().notifyAll();
                 }
             }
+            BusFactory.setThreadDefaultBus(origBus);
         }
     }
 

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java Wed Dec  5 13:27:05 2007
@@ -24,6 +24,7 @@
 import java.util.logging.Logger;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Exchange;
@@ -47,50 +48,58 @@
     public void onMessage(Message message) {
       
         assert null != message;
-        Exchange exchange = message.getExchange();
-
-        Message faultMessage = null;
-
-        // now that we have switched over to the fault chain,
-        // prevent any further operations on the in/out message 
-
-        if (isOutboundObserver()) {
-            Exception ex = message.getContent(Exception.class);
-            if (!(ex instanceof Fault)) {
-                ex = new Fault(ex);
-            }
-            FaultMode mode = (FaultMode)message.get(FaultMode.class);
-            
-            faultMessage = exchange.getOutMessage();
-            if (null == faultMessage) {
-                faultMessage = exchange.get(Endpoint.class).getBinding().createMessage();
-            }
-            faultMessage.setContent(Exception.class, ex);
-            if (null != mode) {
-                faultMessage.put(FaultMode.class, mode);
-            }
-            exchange.setOutMessage(null);
-            exchange.setOutFaultMessage(faultMessage);
-            if (message.get(BindingFaultInfo.class) != null) {
-                faultMessage.put(BindingFaultInfo.class, message.get(BindingFaultInfo.class));
-            }
-        } else {
-            faultMessage = message;
-            exchange.setInMessage(null);
-            exchange.setInFaultMessage(faultMessage);
-        }          
-         
-       
-        // setup chain
-        PhaseInterceptorChain chain = new PhaseInterceptorChain(getPhases());
-        initializeInterceptors(faultMessage.getExchange(), chain);
         
-        faultMessage.setInterceptorChain(chain);
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
         try {
-            chain.doIntercept(faultMessage);
-        } catch (Exception exc) {
-            LogUtils.log(LOG, Level.SEVERE, "Error occurred during error handling, give up!", exc);
-            throw new RuntimeException(exc.getCause());
+            
+            Exchange exchange = message.getExchange();
+    
+            Message faultMessage = null;
+    
+            // now that we have switched over to the fault chain,
+            // prevent any further operations on the in/out message 
+    
+            if (isOutboundObserver()) {
+                Exception ex = message.getContent(Exception.class);
+                if (!(ex instanceof Fault)) {
+                    ex = new Fault(ex);
+                }
+                FaultMode mode = (FaultMode)message.get(FaultMode.class);
+                
+                faultMessage = exchange.getOutMessage();
+                if (null == faultMessage) {
+                    faultMessage = exchange.get(Endpoint.class).getBinding().createMessage();
+                }
+                faultMessage.setContent(Exception.class, ex);
+                if (null != mode) {
+                    faultMessage.put(FaultMode.class, mode);
+                }
+                exchange.setOutMessage(null);
+                exchange.setOutFaultMessage(faultMessage);
+                if (message.get(BindingFaultInfo.class) != null) {
+                    faultMessage.put(BindingFaultInfo.class, message.get(BindingFaultInfo.class));
+                }
+            } else {
+                faultMessage = message;
+                exchange.setInMessage(null);
+                exchange.setInFaultMessage(faultMessage);
+            }          
+             
+           
+            // setup chain
+            PhaseInterceptorChain chain = new PhaseInterceptorChain(getPhases());
+            initializeInterceptors(faultMessage.getExchange(), chain);
+            
+            faultMessage.setInterceptorChain(chain);
+            try {
+                chain.doIntercept(faultMessage);
+            } catch (Exception exc) {
+                LogUtils.log(LOG, Level.SEVERE, "Error occurred during error handling, give up!", exc);
+                throw new RuntimeException(exc.getCause());
+            }
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
         }
     }
 

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Wed Dec  5 13:27:05 2007
@@ -26,6 +26,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Exchange;
@@ -51,27 +52,33 @@
     }
 
     public void onMessage(Message m) {
-        Message message = getBinding().createMessage(m);
-        Exchange exchange = message.getExchange();
-        if (exchange == null) {
-            exchange = new ExchangeImpl();
-            exchange.setInMessage(message);
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try {
+            Message message = getBinding().createMessage(m);
+            Exchange exchange = message.getExchange();
+            if (exchange == null) {
+                exchange = new ExchangeImpl();
+                exchange.setInMessage(message);
+            }
+            setExchangeProperties(exchange, message);
+    
+            // setup chain
+            PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
+                                                         bus.getInInterceptors(),
+                                                         endpoint.getInInterceptors(),
+                                                         getBinding().getInInterceptors(),
+                                                         endpoint.getService().getInInterceptors());
+            
+            
+            message.setInterceptorChain(chain);
+            
+            chain.setFaultObserver(endpoint.getOutFaultObserver());
+           
+            chain.doIntercept(message);
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
         }
-        setExchangeProperties(exchange, message);
-
-        // setup chain
-        PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
-                                                     bus.getInInterceptors(),
-                                                     endpoint.getInInterceptors(),
-                                                     getBinding().getInInterceptors(),
-                                                     endpoint.getService().getInInterceptors());
-        
-        
-        message.setInterceptorChain(chain);
-        
-        chain.setFaultObserver(endpoint.getOutFaultObserver());
-       
-        chain.doIntercept(message);        
     }
 
 

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java Wed Dec  5 13:27:05 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.message.Exchange;
@@ -54,32 +55,38 @@
     }
 
     public void onMessage(Message message) {
-        message = createMessage(message);
-        Exchange exchange = message.getExchange();
-        if (exchange == null) {
-            exchange = new ExchangeImpl();
-            exchange.setInMessage(message);
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try {
+            message = createMessage(message);
+            Exchange exchange = message.getExchange();
+            if (exchange == null) {
+                exchange = new ExchangeImpl();
+                exchange.setInMessage(message);
+            }
+            setExchangeProperties(exchange, message);
+            
+            // setup chain
+            PhaseInterceptorChain chain = createChain();
+            
+            message.setInterceptorChain(chain);
+            
+            chain.add(bus.getInInterceptors());
+            if (bindingInterceptors != null) {
+                chain.add(bindingInterceptors);
+            }
+            if (routingInterceptors != null) {
+                chain.add(routingInterceptors);
+            }
+            
+            if (endpoints != null) {
+                exchange.put(ENDPOINTS, endpoints);
+            }
+            
+            chain.doIntercept(message);
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
         }
-        setExchangeProperties(exchange, message);
-        
-        // setup chain
-        PhaseInterceptorChain chain = createChain();
-        
-        message.setInterceptorChain(chain);
-        
-        chain.add(bus.getInInterceptors());
-        if (bindingInterceptors != null) {
-            chain.add(bindingInterceptors);
-        }
-        if (routingInterceptors != null) {
-            chain.add(routingInterceptors);
-        }
-        
-        if (endpoints != null) {
-            exchange.put(ENDPOINTS, endpoints);
-        }
-        
-        chain.doIntercept(message);        
     }
 
     /**

Modified: incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java (original)
+++ incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java Wed Dec  5 13:27:05 2007
@@ -54,6 +54,7 @@
 import javax.xml.ws.soap.SOAPFaultException;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.binding.soap.SoapBinding;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Client;
@@ -127,94 +128,100 @@
             LOG.info("Dispatch: invoke called");
         }
 
-        Endpoint endpoint = getEndpoint();
-        Message message = endpoint.getBinding().createMessage();
-
-        if (context != null) {
-            message.setContent(JAXBContext.class, context);
-        }
-        
-        
-        Map<String, Object> reqContext = new HashMap<String, Object>(this.getRequestContext());
-        Map<String, Object> respContext = this.getResponseContext();
-        // clear the response context's hold information
-        // Not call the clear Context is to avoid the error 
-        // that getResponseContext() would be called by Client code first
-        respContext.clear();
-        
-        ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext);
-        message.putAll(reqContext);
-        //need to do context mapping from jax-ws to cxf message
-        
-        Exchange exchange = new ExchangeImpl();
-
-        exchange.setOutMessage(message);
-        setExchangeProperties(exchange, endpoint);
-
-        message.setContent(Object.class, obj);
-        
-        if (obj instanceof SOAPMessage) {
-            message.setContent(SOAPMessage.class, obj);
-        } else if (obj instanceof Source) {
-            message.setContent(Source.class, obj);
-        } else if (obj instanceof DataSource) {
-            message.setContent(DataSource.class, obj);
-        }
-  
-        message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
-
-        PhaseInterceptorChain chain = getDispatchOutChain(endpoint);
-        message.setInterceptorChain(chain);
-
-        // setup conduit selector
-        prepareConduitSelector(message);
-        
-        // execute chain
-        chain.doIntercept(message);
-        
-                
-        if (message.getContent(Exception.class) != null) {
-            getConduitSelector().complete(exchange);
-            if (getBinding() instanceof SOAPBinding) {
-                try {
-                    SOAPFault soapFault = SOAPFactory.newInstance().createFault();
-                    Fault fault = (Fault)message.getContent(Exception.class);
-                    soapFault.setFaultCode(fault.getFaultCode());
-                    soapFault.setFaultString(fault.getMessage());
-                    SOAPFaultException exception = new SOAPFaultException(soapFault);
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try { 
+            Endpoint endpoint = getEndpoint();
+            Message message = endpoint.getBinding().createMessage();
+    
+            if (context != null) {
+                message.setContent(JAXBContext.class, context);
+            }
+            
+            
+            Map<String, Object> reqContext = new HashMap<String, Object>(this.getRequestContext());
+            Map<String, Object> respContext = this.getResponseContext();
+            // clear the response context's hold information
+            // Not call the clear Context is to avoid the error 
+            // that getResponseContext() would be called by Client code first
+            respContext.clear();
+            
+            ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext);
+            message.putAll(reqContext);
+            //need to do context mapping from jax-ws to cxf message
+            
+            Exchange exchange = new ExchangeImpl();
+    
+            exchange.setOutMessage(message);
+            setExchangeProperties(exchange, endpoint);
+    
+            message.setContent(Object.class, obj);
+            
+            if (obj instanceof SOAPMessage) {
+                message.setContent(SOAPMessage.class, obj);
+            } else if (obj instanceof Source) {
+                message.setContent(Source.class, obj);
+            } else if (obj instanceof DataSource) {
+                message.setContent(DataSource.class, obj);
+            }
+      
+            message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+    
+            PhaseInterceptorChain chain = getDispatchOutChain(endpoint);
+            message.setInterceptorChain(chain);
+    
+            // setup conduit selector
+            prepareConduitSelector(message);
+            
+            // execute chain
+            chain.doIntercept(message);
+            
+                    
+            if (message.getContent(Exception.class) != null) {
+                getConduitSelector().complete(exchange);
+                if (getBinding() instanceof SOAPBinding) {
+                    try {
+                        SOAPFault soapFault = SOAPFactory.newInstance().createFault();
+                        Fault fault = (Fault)message.getContent(Exception.class);
+                        soapFault.setFaultCode(fault.getFaultCode());
+                        soapFault.setFaultString(fault.getMessage());
+                        SOAPFaultException exception = new SOAPFaultException(soapFault);
+                        throw exception;
+                    } catch (SOAPException e) {
+                        throw new WebServiceException(e);
+                    }
+                } else if (getBinding() instanceof HTTPBinding) {
+                    HTTPException exception = new HTTPException(HttpURLConnection.HTTP_INTERNAL_ERROR);
+                    exception.initCause(message.getContent(Exception.class));
                     throw exception;
-                } catch (SOAPException e) {
-                    throw new WebServiceException(e);
+                } else {
+                    throw new WebServiceException(message.getContent(Exception.class));
                 }
-            } else if (getBinding() instanceof HTTPBinding) {
-                HTTPException exception = new HTTPException(HttpURLConnection.HTTP_INTERNAL_ERROR);
-                exception.initCause(message.getContent(Exception.class));
-                throw exception;
+            }
+    
+            // correlate response        
+            if (getConduitSelector().selectConduit(message).getBackChannel() != null) {
+                // process partial response and wait for decoupled response
             } else {
-                throw new WebServiceException(message.getContent(Exception.class));
+                // process response: send was synchronous so when we get here we can assume that the 
+                // Exchange's inbound message is set and had been passed through the inbound
+                // interceptor chain.
             }
-        }
-
-        // correlate response        
-        if (getConduitSelector().selectConduit(message).getBackChannel() != null) {
-            // process partial response and wait for decoupled response
-        } else {
-            // process response: send was synchronous so when we get here we can assume that the 
-            // Exchange's inbound message is set and had been passed through the inbound interceptor chain.
-        }
-
-        if (!isOneWay) {
-            synchronized (exchange) {
-                Message inMsg = waitResponse(exchange);
-                respContext.putAll(inMsg);
-                getConduitSelector().complete(exchange);
-                //need to do context mapping from cxf message to jax-ws 
-                ContextPropertiesMapping.mapResponsefromCxf2Jaxws(respContext);
-                return cl.cast(inMsg.getContent(Object.class));
+    
+            if (!isOneWay) {
+                synchronized (exchange) {
+                    Message inMsg = waitResponse(exchange);
+                    respContext.putAll(inMsg);
+                    getConduitSelector().complete(exchange);
+                    //need to do context mapping from cxf message to jax-ws 
+                    ContextPropertiesMapping.mapResponsefromCxf2Jaxws(respContext);
+                    return cl.cast(inMsg.getContent(Object.class));
+                }
             }
-        }
-        return null;
-        
+            return null;
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
+        }        
     }
 
     private Message waitResponse(Exchange exchange) {
@@ -308,6 +315,8 @@
         chain.add(inInterceptors);
 
         // execute chain
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
         try {
             chain.doIntercept(message);
         } finally {
@@ -316,6 +325,7 @@
                 message.getExchange().setInMessage(message);
                 message.getExchange().notifyAll();
             }
+            BusFactory.setThreadDefaultBus(origBus);
         }
     }
 

Modified: incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Wed Dec  5 13:27:05 2007
@@ -31,6 +31,7 @@
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.helpers.HttpHeaderHelper;
@@ -237,7 +238,12 @@
         }
 
         // REVISIT: service on executor if associated with endpoint
-        serviceRequest(req, resp);
+        try {
+            BusFactory.setThreadDefaultBus(bus); 
+            serviceRequest(req, resp);
+        } finally {
+            BusFactory.setThreadDefaultBus(null);  
+        }    
     }
 
     protected void serviceRequest(final HttpServletRequest req, final HttpServletResponse resp)

Modified: incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java Wed Dec  5 13:27:05 2007
@@ -33,6 +33,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.bus.CXFBusImpl;
 import org.apache.cxf.common.util.Base64Utility;
 import org.apache.cxf.common.util.StringUtils;
@@ -81,6 +82,7 @@
     private static final String DIGEST_CHALLENGE = "Digest realm=luna";
     private static final String CUSTOM_CHALLENGE = "Custom realm=sol";
     private Bus bus;
+    private Bus threadDefaultBus;
     private Conduit decoupledBackChannel;
     private EndpointInfo endpointInfo;
     private EndpointReferenceType address;
@@ -137,6 +139,7 @@
         is = null;
         os = null;
         destination = null;
+        BusFactory.setDefaultBus(null); 
     }
     
     @Test
@@ -194,10 +197,17 @@
 
     @Test
     public void testDoService() throws Exception {
+        Bus defaultBus = new CXFBusImpl();
+        assertSame("Default thread bus has not been set",
+                   defaultBus, BusFactory.getThreadDefaultBus()); 
         destination = setUpDestination(false, false);
         setUpDoService(false);
+        assertSame("Default thread bus has been unexpectedly reset",
+                   defaultBus, BusFactory.getThreadDefaultBus());
         destination.doService(request, response);
         verifyDoService();
+        assertSame("Default thread bus has not been reset",
+                    defaultBus, BusFactory.getThreadDefaultBus());
     }
     
     @Test
@@ -491,6 +501,7 @@
         observer = new MessageObserver() {
             public void onMessage(Message m) {
                 inMessage = m;
+                threadDefaultBus = BusFactory.getThreadDefaultBus();
             }
         };
         dest.setMessageObserver(observer);
@@ -704,6 +715,8 @@
     }
 
     private void verifyDoService() throws Exception {
+        assertSame("Default thread bus has not been set for request",
+                    bus, threadDefaultBus);
         assertNotNull("unexpected null message", inMessage);
         assertSame("unexpected HTTP request",
                    inMessage.get(JettyHTTPDestination.HTTP_REQUEST),

Modified: incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Wed Dec  5 13:27:05 2007
@@ -41,6 +41,7 @@
 import javax.naming.NamingException;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.Configurable;
 import org.apache.cxf.configuration.Configurer;
@@ -195,13 +196,17 @@
                         
             inMessage.setDestination(this);            
             
+            BusFactory.setThreadDefaultBus(bus);
+            
             //handle the incoming message
             incomingObserver.onMessage(inMessage);
            
         } catch (JMSException jmsex) {
             //TODO: need to revisit for which exception should we throw.
             throw new IOException(jmsex.getMessage());
-        } 
+        } finally {
+            BusFactory.setThreadDefaultBus(null);
+        }
     }
     
     public void connected(javax.jms.Destination target,