You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2007/12/05 22:27:23 UTC
svn commit: r601535 - in /incubator/cxf/trunk:
api/src/main/java/org/apache/cxf/
rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/
rt/core/src/main/java/org/apache/cxf/endpoint/
rt/core/src/main/java/org/apache/cxf/interceptor/ rt/core/src/...
Author: dkulp
Date: Wed Dec 5 13:27:05 2007
New Revision: 601535
URL: http://svn.apache.org/viewvc?rev=601535&view=rev
Log:
[CXF-1264] Make sure the ThreadDefaultBus is propery set when doing dispatches of messages
so that handlers, interceptors, services, etc.... that may create a new client or
similar will get the correct bus.
Also, ThreadLocals are thread safe, we don't need to synchronize to access them.
Modified:
incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java
incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java
incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/BusFactory.java Wed Dec 5 13:27:05 2007
@@ -85,7 +85,7 @@
* Sets the default bus for the thread.
* @param bus the default bus.
*/
- public static synchronized void setThreadDefaultBus(Bus bus) {
+ public static void setThreadDefaultBus(Bus bus) {
localBus.set(bus);
}
@@ -93,9 +93,17 @@
* Gets the default bus for the thread.
* @return the default bus.
*/
- public static synchronized Bus getThreadDefaultBus() {
- if (localBus.get() == null) {
- Bus b = getDefaultBus();
+ public static Bus getThreadDefaultBus() {
+ return getThreadDefaultBus(true);
+ }
+ /**
+ * Gets the default bus for the thread, creating if needed
+ * @param createIfNeeded Set to true to create a default bus if one doesn't exist
+ * @return the default bus.
+ */
+ public static Bus getThreadDefaultBus(boolean createIfNeeded) {
+ if (createIfNeeded && localBus.get() == null) {
+ Bus b = getDefaultBus(createIfNeeded);
localBus.set(b);
}
return localBus.get();
Modified: incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java (original)
+++ incubator/cxf/trunk/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java Wed Dec 5 13:27:05 2007
@@ -28,6 +28,7 @@
import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.binding.Binding;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
@@ -55,44 +56,50 @@
}
public void onMessage(Message m) {
- if (LOG.isLoggable(Level.FINER)) {
- LOG.finer("Processing Message at collocated endpoint. Request message: " + m);
- }
- Exchange ex = new ExchangeImpl();
- setExchangeProperties(ex, m);
-
- Message inMsg = endpoint.getBinding().createMessage();
- MessageImpl.copyContent(m, inMsg);
-
- //Copy Request Context to Server inBound Message
- //TODO a Context Filter Strategy required.
- inMsg.putAll(m);
-
- inMsg.put(COLOCATED, Boolean.TRUE);
- inMsg.put(Message.REQUESTOR_ROLE, Boolean.FALSE);
- inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
- OperationInfo oi = ex.get(OperationInfo.class);
- if (oi != null) {
- inMsg.put(MessageInfo.class, oi.getInput());
- }
- ex.setInMessage(inMsg);
- inMsg.setExchange(ex);
-
- if (LOG.isLoggable(Level.FINEST)) {
- LOG.finest("Build inbound interceptor chain.");
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
+ try {
+ if (LOG.isLoggable(Level.FINER)) {
+ LOG.finer("Processing Message at collocated endpoint. Request message: " + m);
+ }
+ Exchange ex = new ExchangeImpl();
+ setExchangeProperties(ex, m);
+
+ Message inMsg = endpoint.getBinding().createMessage();
+ MessageImpl.copyContent(m, inMsg);
+
+ //Copy Request Context to Server inBound Message
+ //TODO a Context Filter Strategy required.
+ inMsg.putAll(m);
+
+ inMsg.put(COLOCATED, Boolean.TRUE);
+ inMsg.put(Message.REQUESTOR_ROLE, Boolean.FALSE);
+ inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
+ OperationInfo oi = ex.get(OperationInfo.class);
+ if (oi != null) {
+ inMsg.put(MessageInfo.class, oi.getInput());
+ }
+ ex.setInMessage(inMsg);
+ inMsg.setExchange(ex);
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.finest("Build inbound interceptor chain.");
+ }
+
+ //Add all interceptors between USER_LOGICAL and INVOKE.
+ SortedSet<Phase> phases = new TreeSet<Phase>(bus.getExtension(PhaseManager.class).getInPhases());
+ ColocUtil.setPhases(phases, Phase.USER_LOGICAL, Phase.INVOKE);
+ InterceptorChain chain = ColocUtil.getInInterceptorChain(ex, phases);
+ chain.add(addColocInterceptors());
+ inMsg.setInterceptorChain(chain);
+
+ chain.doIntercept(inMsg);
+
+ //Set Server OutBound Message onto InBound Exchange.
+ setOutBoundMessage(ex, m.getExchange());
+ } finally {
+ BusFactory.setThreadDefaultBus(origBus);
}
-
- //Add all interceptors between USER_LOGICAL and INVOKE.
- SortedSet<Phase> phases = new TreeSet<Phase>(bus.getExtension(PhaseManager.class).getInPhases());
- ColocUtil.setPhases(phases, Phase.USER_LOGICAL, Phase.INVOKE);
- InterceptorChain chain = ColocUtil.getInInterceptorChain(ex, phases);
- chain.add(addColocInterceptors());
- inMsg.setInterceptorChain(chain);
-
- chain.doIntercept(inMsg);
-
- //Set Server OutBound Message onto InBound Exchange.
- setOutBoundMessage(ex, m.getExchange());
}
protected void setOutBoundMessage(Exchange from, Exchange to) {
Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Wed Dec 5 13:27:05 2007
@@ -226,101 +226,109 @@
Object[] params,
Map<String, Object> context,
Exchange exchange) throws Exception {
- if (exchange == null) {
- exchange = new ExchangeImpl();
- }
- Endpoint endpoint = getEndpoint();
-
- Map<String, Object> requestContext = null;
- Map<String, Object> responseContext = null;
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Invoke, operation info: " + oi + ", params: " + params);
- }
- Message message = endpoint.getBinding().createMessage();
- if (null != context) {
- requestContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT));
- responseContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT));
- message.put(Message.INVOCATION_CONTEXT, context);
- }
- //setup the message context
- setContext(requestContext, message);
- setParameters(params, message);
-
- if (null != requestContext) {
- exchange.putAll(requestContext);
- }
- exchange.setOneWay(oi.getOutput() == null);
-
- exchange.setOutMessage(message);
-
- setOutMessageProperties(message, oi);
- setExchangeProperties(exchange, endpoint, oi);
-
- // setup chain
-
- PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
- message.setInterceptorChain(chain);
-
- modifyChain(chain, requestContext);
- chain.setFaultObserver(outFaultObserver);
-
- // setup conduit selector
- prepareConduitSelector(message);
-
- // execute chain
- chain.doIntercept(message);
-
+
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
+ try {
- // Check to see if there is a Fault from the outgoing chain
- Exception ex = message.getContent(Exception.class);
- boolean mepCompleteCalled = false;
- if (ex != null) {
- getConduitSelector().complete(exchange);
- mepCompleteCalled = true;
- if (message.getContent(Exception.class) != null) {
- throw ex;
+ if (exchange == null) {
+ exchange = new ExchangeImpl();
}
- }
- ex = message.getExchange().get(Exception.class);
- if (ex != null) {
- if (!mepCompleteCalled) {
+ Endpoint endpoint = getEndpoint();
+
+ Map<String, Object> requestContext = null;
+ Map<String, Object> responseContext = null;
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Invoke, operation info: " + oi + ", params: " + params);
+ }
+ Message message = endpoint.getBinding().createMessage();
+ if (null != context) {
+ requestContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT));
+ responseContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT));
+ message.put(Message.INVOCATION_CONTEXT, context);
+ }
+ //setup the message context
+ setContext(requestContext, message);
+ setParameters(params, message);
+
+ if (null != requestContext) {
+ exchange.putAll(requestContext);
+ }
+ exchange.setOneWay(oi.getOutput() == null);
+
+ exchange.setOutMessage(message);
+
+ setOutMessageProperties(message, oi);
+ setExchangeProperties(exchange, endpoint, oi);
+
+ // setup chain
+
+ PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
+ message.setInterceptorChain(chain);
+
+ modifyChain(chain, requestContext);
+ chain.setFaultObserver(outFaultObserver);
+
+ // setup conduit selector
+ prepareConduitSelector(message);
+
+ // execute chain
+ chain.doIntercept(message);
+
+
+ // Check to see if there is a Fault from the outgoing chain
+ Exception ex = message.getContent(Exception.class);
+ boolean mepCompleteCalled = false;
+ if (ex != null) {
getConduitSelector().complete(exchange);
+ mepCompleteCalled = true;
+ if (message.getContent(Exception.class) != null) {
+ throw ex;
+ }
}
- throw ex;
- }
-
- // Wait for a response if we need to
- if (!oi.getOperationInfo().isOneWay()) {
- synchronized (exchange) {
- waitResponse(exchange);
+ ex = message.getExchange().get(Exception.class);
+ if (ex != null) {
+ if (!mepCompleteCalled) {
+ getConduitSelector().complete(exchange);
+ }
+ throw ex;
}
- }
- getConduitSelector().complete(exchange);
-
- // Grab the response objects if there are any
- List resList = null;
- Message inMsg = exchange.getInMessage();
- if (inMsg != null) {
- if (null != responseContext) {
- responseContext.putAll(inMsg);
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("set responseContext to be" + responseContext);
+
+ // Wait for a response if we need to
+ if (!oi.getOperationInfo().isOneWay()) {
+ synchronized (exchange) {
+ waitResponse(exchange);
}
}
- resList = inMsg.getContent(List.class);
- }
-
- // check for an incoming fault
- ex = getException(exchange);
-
- if (ex != null) {
- throw ex;
- }
-
- if (resList != null) {
- return resList.toArray();
+ getConduitSelector().complete(exchange);
+
+ // Grab the response objects if there are any
+ List resList = null;
+ Message inMsg = exchange.getInMessage();
+ if (inMsg != null) {
+ if (null != responseContext) {
+ responseContext.putAll(inMsg);
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("set responseContext to be" + responseContext);
+ }
+ }
+ resList = inMsg.getContent(List.class);
+ }
+
+ // check for an incoming fault
+ ex = getException(exchange);
+
+ if (ex != null) {
+ throw ex;
+ }
+
+ if (resList != null) {
+ return resList.toArray();
+ }
+ return null;
+ } finally {
+ BusFactory.setThreadDefaultBus(origBus);
}
- return null;
}
protected Exception getException(Exchange exchange) {
@@ -405,6 +413,8 @@
chain.setFaultObserver(outFaultObserver);
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
// execute chain
try {
String startingAfterInterceptorID = (String) message.get(
@@ -426,6 +436,7 @@
message.getExchange().notifyAll();
}
}
+ BusFactory.setThreadDefaultBus(origBus);
}
}
Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java Wed Dec 5 13:27:05 2007
@@ -24,6 +24,7 @@
import java.util.logging.Logger;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.message.Exchange;
@@ -47,50 +48,58 @@
public void onMessage(Message message) {
assert null != message;
- Exchange exchange = message.getExchange();
-
- Message faultMessage = null;
-
- // now that we have switched over to the fault chain,
- // prevent any further operations on the in/out message
-
- if (isOutboundObserver()) {
- Exception ex = message.getContent(Exception.class);
- if (!(ex instanceof Fault)) {
- ex = new Fault(ex);
- }
- FaultMode mode = (FaultMode)message.get(FaultMode.class);
-
- faultMessage = exchange.getOutMessage();
- if (null == faultMessage) {
- faultMessage = exchange.get(Endpoint.class).getBinding().createMessage();
- }
- faultMessage.setContent(Exception.class, ex);
- if (null != mode) {
- faultMessage.put(FaultMode.class, mode);
- }
- exchange.setOutMessage(null);
- exchange.setOutFaultMessage(faultMessage);
- if (message.get(BindingFaultInfo.class) != null) {
- faultMessage.put(BindingFaultInfo.class, message.get(BindingFaultInfo.class));
- }
- } else {
- faultMessage = message;
- exchange.setInMessage(null);
- exchange.setInFaultMessage(faultMessage);
- }
-
-
- // setup chain
- PhaseInterceptorChain chain = new PhaseInterceptorChain(getPhases());
- initializeInterceptors(faultMessage.getExchange(), chain);
- faultMessage.setInterceptorChain(chain);
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
try {
- chain.doIntercept(faultMessage);
- } catch (Exception exc) {
- LogUtils.log(LOG, Level.SEVERE, "Error occurred during error handling, give up!", exc);
- throw new RuntimeException(exc.getCause());
+
+ Exchange exchange = message.getExchange();
+
+ Message faultMessage = null;
+
+ // now that we have switched over to the fault chain,
+ // prevent any further operations on the in/out message
+
+ if (isOutboundObserver()) {
+ Exception ex = message.getContent(Exception.class);
+ if (!(ex instanceof Fault)) {
+ ex = new Fault(ex);
+ }
+ FaultMode mode = (FaultMode)message.get(FaultMode.class);
+
+ faultMessage = exchange.getOutMessage();
+ if (null == faultMessage) {
+ faultMessage = exchange.get(Endpoint.class).getBinding().createMessage();
+ }
+ faultMessage.setContent(Exception.class, ex);
+ if (null != mode) {
+ faultMessage.put(FaultMode.class, mode);
+ }
+ exchange.setOutMessage(null);
+ exchange.setOutFaultMessage(faultMessage);
+ if (message.get(BindingFaultInfo.class) != null) {
+ faultMessage.put(BindingFaultInfo.class, message.get(BindingFaultInfo.class));
+ }
+ } else {
+ faultMessage = message;
+ exchange.setInMessage(null);
+ exchange.setInFaultMessage(faultMessage);
+ }
+
+
+ // setup chain
+ PhaseInterceptorChain chain = new PhaseInterceptorChain(getPhases());
+ initializeInterceptors(faultMessage.getExchange(), chain);
+
+ faultMessage.setInterceptorChain(chain);
+ try {
+ chain.doIntercept(faultMessage);
+ } catch (Exception exc) {
+ LogUtils.log(LOG, Level.SEVERE, "Error occurred during error handling, give up!", exc);
+ throw new RuntimeException(exc.getCause());
+ }
+ } finally {
+ BusFactory.setThreadDefaultBus(origBus);
}
}
Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Wed Dec 5 13:27:05 2007
@@ -26,6 +26,7 @@
import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.binding.Binding;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.message.Exchange;
@@ -51,27 +52,33 @@
}
public void onMessage(Message m) {
- Message message = getBinding().createMessage(m);
- Exchange exchange = message.getExchange();
- if (exchange == null) {
- exchange = new ExchangeImpl();
- exchange.setInMessage(message);
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
+ try {
+ Message message = getBinding().createMessage(m);
+ Exchange exchange = message.getExchange();
+ if (exchange == null) {
+ exchange = new ExchangeImpl();
+ exchange.setInMessage(message);
+ }
+ setExchangeProperties(exchange, message);
+
+ // setup chain
+ PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
+ bus.getInInterceptors(),
+ endpoint.getInInterceptors(),
+ getBinding().getInInterceptors(),
+ endpoint.getService().getInInterceptors());
+
+
+ message.setInterceptorChain(chain);
+
+ chain.setFaultObserver(endpoint.getOutFaultObserver());
+
+ chain.doIntercept(message);
+ } finally {
+ BusFactory.setThreadDefaultBus(origBus);
}
- setExchangeProperties(exchange, message);
-
- // setup chain
- PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
- bus.getInInterceptors(),
- endpoint.getInInterceptors(),
- getBinding().getInInterceptors(),
- endpoint.getService().getInInterceptors());
-
-
- message.setInterceptorChain(chain);
-
- chain.setFaultObserver(endpoint.getOutFaultObserver());
-
- chain.doIntercept(message);
}
Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java Wed Dec 5 13:27:05 2007
@@ -24,6 +24,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.message.Exchange;
@@ -54,32 +55,38 @@
}
public void onMessage(Message message) {
- message = createMessage(message);
- Exchange exchange = message.getExchange();
- if (exchange == null) {
- exchange = new ExchangeImpl();
- exchange.setInMessage(message);
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
+ try {
+ message = createMessage(message);
+ Exchange exchange = message.getExchange();
+ if (exchange == null) {
+ exchange = new ExchangeImpl();
+ exchange.setInMessage(message);
+ }
+ setExchangeProperties(exchange, message);
+
+ // setup chain
+ PhaseInterceptorChain chain = createChain();
+
+ message.setInterceptorChain(chain);
+
+ chain.add(bus.getInInterceptors());
+ if (bindingInterceptors != null) {
+ chain.add(bindingInterceptors);
+ }
+ if (routingInterceptors != null) {
+ chain.add(routingInterceptors);
+ }
+
+ if (endpoints != null) {
+ exchange.put(ENDPOINTS, endpoints);
+ }
+
+ chain.doIntercept(message);
+ } finally {
+ BusFactory.setThreadDefaultBus(origBus);
}
- setExchangeProperties(exchange, message);
-
- // setup chain
- PhaseInterceptorChain chain = createChain();
-
- message.setInterceptorChain(chain);
-
- chain.add(bus.getInInterceptors());
- if (bindingInterceptors != null) {
- chain.add(bindingInterceptors);
- }
- if (routingInterceptors != null) {
- chain.add(routingInterceptors);
- }
-
- if (endpoints != null) {
- exchange.put(ENDPOINTS, endpoints);
- }
-
- chain.doIntercept(message);
}
/**
Modified: incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java (original)
+++ incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java Wed Dec 5 13:27:05 2007
@@ -54,6 +54,7 @@
import javax.xml.ws.soap.SOAPFaultException;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.binding.soap.SoapBinding;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Client;
@@ -127,94 +128,100 @@
LOG.info("Dispatch: invoke called");
}
- Endpoint endpoint = getEndpoint();
- Message message = endpoint.getBinding().createMessage();
-
- if (context != null) {
- message.setContent(JAXBContext.class, context);
- }
-
-
- Map<String, Object> reqContext = new HashMap<String, Object>(this.getRequestContext());
- Map<String, Object> respContext = this.getResponseContext();
- // clear the response context's hold information
- // Not call the clear Context is to avoid the error
- // that getResponseContext() would be called by Client code first
- respContext.clear();
-
- ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext);
- message.putAll(reqContext);
- //need to do context mapping from jax-ws to cxf message
-
- Exchange exchange = new ExchangeImpl();
-
- exchange.setOutMessage(message);
- setExchangeProperties(exchange, endpoint);
-
- message.setContent(Object.class, obj);
-
- if (obj instanceof SOAPMessage) {
- message.setContent(SOAPMessage.class, obj);
- } else if (obj instanceof Source) {
- message.setContent(Source.class, obj);
- } else if (obj instanceof DataSource) {
- message.setContent(DataSource.class, obj);
- }
-
- message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
-
- PhaseInterceptorChain chain = getDispatchOutChain(endpoint);
- message.setInterceptorChain(chain);
-
- // setup conduit selector
- prepareConduitSelector(message);
-
- // execute chain
- chain.doIntercept(message);
-
-
- if (message.getContent(Exception.class) != null) {
- getConduitSelector().complete(exchange);
- if (getBinding() instanceof SOAPBinding) {
- try {
- SOAPFault soapFault = SOAPFactory.newInstance().createFault();
- Fault fault = (Fault)message.getContent(Exception.class);
- soapFault.setFaultCode(fault.getFaultCode());
- soapFault.setFaultString(fault.getMessage());
- SOAPFaultException exception = new SOAPFaultException(soapFault);
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
+ try {
+ Endpoint endpoint = getEndpoint();
+ Message message = endpoint.getBinding().createMessage();
+
+ if (context != null) {
+ message.setContent(JAXBContext.class, context);
+ }
+
+
+ Map<String, Object> reqContext = new HashMap<String, Object>(this.getRequestContext());
+ Map<String, Object> respContext = this.getResponseContext();
+ // clear the response context's hold information
+ // Not call the clear Context is to avoid the error
+ // that getResponseContext() would be called by Client code first
+ respContext.clear();
+
+ ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext);
+ message.putAll(reqContext);
+ //need to do context mapping from jax-ws to cxf message
+
+ Exchange exchange = new ExchangeImpl();
+
+ exchange.setOutMessage(message);
+ setExchangeProperties(exchange, endpoint);
+
+ message.setContent(Object.class, obj);
+
+ if (obj instanceof SOAPMessage) {
+ message.setContent(SOAPMessage.class, obj);
+ } else if (obj instanceof Source) {
+ message.setContent(Source.class, obj);
+ } else if (obj instanceof DataSource) {
+ message.setContent(DataSource.class, obj);
+ }
+
+ message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+
+ PhaseInterceptorChain chain = getDispatchOutChain(endpoint);
+ message.setInterceptorChain(chain);
+
+ // setup conduit selector
+ prepareConduitSelector(message);
+
+ // execute chain
+ chain.doIntercept(message);
+
+
+ if (message.getContent(Exception.class) != null) {
+ getConduitSelector().complete(exchange);
+ if (getBinding() instanceof SOAPBinding) {
+ try {
+ SOAPFault soapFault = SOAPFactory.newInstance().createFault();
+ Fault fault = (Fault)message.getContent(Exception.class);
+ soapFault.setFaultCode(fault.getFaultCode());
+ soapFault.setFaultString(fault.getMessage());
+ SOAPFaultException exception = new SOAPFaultException(soapFault);
+ throw exception;
+ } catch (SOAPException e) {
+ throw new WebServiceException(e);
+ }
+ } else if (getBinding() instanceof HTTPBinding) {
+ HTTPException exception = new HTTPException(HttpURLConnection.HTTP_INTERNAL_ERROR);
+ exception.initCause(message.getContent(Exception.class));
throw exception;
- } catch (SOAPException e) {
- throw new WebServiceException(e);
+ } else {
+ throw new WebServiceException(message.getContent(Exception.class));
}
- } else if (getBinding() instanceof HTTPBinding) {
- HTTPException exception = new HTTPException(HttpURLConnection.HTTP_INTERNAL_ERROR);
- exception.initCause(message.getContent(Exception.class));
- throw exception;
+ }
+
+ // correlate response
+ if (getConduitSelector().selectConduit(message).getBackChannel() != null) {
+ // process partial response and wait for decoupled response
} else {
- throw new WebServiceException(message.getContent(Exception.class));
+ // process response: send was synchronous so when we get here we can assume that the
+ // Exchange's inbound message is set and had been passed through the inbound
+ // interceptor chain.
}
- }
-
- // correlate response
- if (getConduitSelector().selectConduit(message).getBackChannel() != null) {
- // process partial response and wait for decoupled response
- } else {
- // process response: send was synchronous so when we get here we can assume that the
- // Exchange's inbound message is set and had been passed through the inbound interceptor chain.
- }
-
- if (!isOneWay) {
- synchronized (exchange) {
- Message inMsg = waitResponse(exchange);
- respContext.putAll(inMsg);
- getConduitSelector().complete(exchange);
- //need to do context mapping from cxf message to jax-ws
- ContextPropertiesMapping.mapResponsefromCxf2Jaxws(respContext);
- return cl.cast(inMsg.getContent(Object.class));
+
+ if (!isOneWay) {
+ synchronized (exchange) {
+ Message inMsg = waitResponse(exchange);
+ respContext.putAll(inMsg);
+ getConduitSelector().complete(exchange);
+ //need to do context mapping from cxf message to jax-ws
+ ContextPropertiesMapping.mapResponsefromCxf2Jaxws(respContext);
+ return cl.cast(inMsg.getContent(Object.class));
+ }
}
- }
- return null;
-
+ return null;
+ } finally {
+ BusFactory.setThreadDefaultBus(origBus);
+ }
}
private Message waitResponse(Exchange exchange) {
@@ -308,6 +315,8 @@
chain.add(inInterceptors);
// execute chain
+ Bus origBus = BusFactory.getThreadDefaultBus(false);
+ BusFactory.setThreadDefaultBus(bus);
try {
chain.doIntercept(message);
} finally {
@@ -316,6 +325,7 @@
message.getExchange().setInMessage(message);
message.getExchange().notifyAll();
}
+ BusFactory.setThreadDefaultBus(origBus);
}
}
Modified: incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Wed Dec 5 13:27:05 2007
@@ -31,6 +31,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.HttpHeaderHelper;
@@ -237,7 +238,12 @@
}
// REVISIT: service on executor if associated with endpoint
- serviceRequest(req, resp);
+ try {
+ BusFactory.setThreadDefaultBus(bus);
+ serviceRequest(req, resp);
+ } finally {
+ BusFactory.setThreadDefaultBus(null);
+ }
}
protected void serviceRequest(final HttpServletRequest req, final HttpServletResponse resp)
Modified: incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java Wed Dec 5 13:27:05 2007
@@ -33,6 +33,7 @@
import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.bus.CXFBusImpl;
import org.apache.cxf.common.util.Base64Utility;
import org.apache.cxf.common.util.StringUtils;
@@ -81,6 +82,7 @@
private static final String DIGEST_CHALLENGE = "Digest realm=luna";
private static final String CUSTOM_CHALLENGE = "Custom realm=sol";
private Bus bus;
+ private Bus threadDefaultBus;
private Conduit decoupledBackChannel;
private EndpointInfo endpointInfo;
private EndpointReferenceType address;
@@ -137,6 +139,7 @@
is = null;
os = null;
destination = null;
+ BusFactory.setDefaultBus(null);
}
@Test
@@ -194,10 +197,17 @@
@Test
public void testDoService() throws Exception {
+ Bus defaultBus = new CXFBusImpl();
+ assertSame("Default thread bus has not been set",
+ defaultBus, BusFactory.getThreadDefaultBus());
destination = setUpDestination(false, false);
setUpDoService(false);
+ assertSame("Default thread bus has been unexpectedly reset",
+ defaultBus, BusFactory.getThreadDefaultBus());
destination.doService(request, response);
verifyDoService();
+ assertSame("Default thread bus has not been reset",
+ defaultBus, BusFactory.getThreadDefaultBus());
}
@Test
@@ -491,6 +501,7 @@
observer = new MessageObserver() {
public void onMessage(Message m) {
inMessage = m;
+ threadDefaultBus = BusFactory.getThreadDefaultBus();
}
};
dest.setMessageObserver(observer);
@@ -704,6 +715,8 @@
}
private void verifyDoService() throws Exception {
+ assertSame("Default thread bus has not been set for request",
+ bus, threadDefaultBus);
assertNotNull("unexpected null message", inMessage);
assertSame("unexpected HTTP request",
inMessage.get(JettyHTTPDestination.HTTP_REQUEST),
Modified: incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=601535&r1=601534&r2=601535&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Wed Dec 5 13:27:05 2007
@@ -41,6 +41,7 @@
import javax.naming.NamingException;
import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
import org.apache.cxf.configuration.Configurer;
@@ -195,13 +196,17 @@
inMessage.setDestination(this);
+ BusFactory.setThreadDefaultBus(bus);
+
//handle the incoming message
incomingObserver.onMessage(inMessage);
} catch (JMSException jmsex) {
//TODO: need to revisit for which exception should we throw.
throw new IOException(jmsex.getMessage());
- }
+ } finally {
+ BusFactory.setThreadDefaultBus(null);
+ }
}
public void connected(javax.jms.Destination target,