You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by bh...@apache.org on 2009/05/22 13:32:34 UTC
svn commit: r777481 -
/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
Author: bharath
Date: Fri May 22 11:32:33 2009
New Revision: 777481
URL: http://svn.apache.org/viewvc?rev=777481&view=rev
Log:
Fix for CXF-2164.
As discussed on the Jira, we now maintain a WeakHashMap for the request and response contexts. (With the Thread as the key). This would mean the contexts would get GC's when the ClientImpl gets GC'd (during undeployment).
Modified:
cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=777481&r1=777480&r2=777481&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Fri May 22 11:32:33 2009
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Level;
@@ -68,29 +69,27 @@
public class ClientImpl
extends AbstractBasicInterceptorProvider
implements Client, Retryable, MessageObserver {
-
+
public static final String THREAD_LOCAL_REQUEST_CONTEXT = "thread.local.request.context";
-
+
public static final String FINISHED = "exchange.finished";
-
+
private static final Logger LOG = LogUtils.getL7dLogger(ClientImpl.class);
-
+
protected Bus bus;
protected ConduitSelector conduitSelector;
- protected ClientOutFaultObserver outFaultObserver;
+ protected ClientOutFaultObserver outFaultObserver;
protected int synchronousTimeout = 60000; // default 60 second timeout
-
+
protected PhaseChainCache outboundChainCache = new PhaseChainCache();
protected PhaseChainCache inboundChainCache = new PhaseChainCache();
-
+
protected Map<String, Object> currentRequestContext = new ConcurrentHashMap<String, Object>();
- protected ThreadLocal <EchoContext> requestContext =
- new ThreadLocal<EchoContext>();
+ protected Map<Thread, EchoContext> requestContext = new WeakHashMap<Thread, EchoContext>();
+
+ protected Map<Thread, Map<String, Object>> responseContext = new WeakHashMap<Thread, Map<String, Object>>();
- protected ThreadLocal <Map<String, Object>> responseContext =
- new ThreadLocal<Map<String, Object>>();
-
protected Executor executor;
@@ -101,7 +100,7 @@
public ClientImpl(Bus b, Endpoint e, Conduit c) {
this(b, e, new PreexistingConduitSelector(c));
}
-
+
public ClientImpl(Bus b, Endpoint e, ConduitSelector sc) {
bus = b;
outFaultObserver = new ClientOutFaultObserver(bus);
@@ -112,7 +111,7 @@
public ClientImpl(URL wsdlUrl) {
this(BusFactory.getThreadDefaultBus(), wsdlUrl, null, null, SimpleEndpointImplFactory.getSingleton());
}
-
+
public ClientImpl(URL wsdlUrl, QName port) {
this(BusFactory.getThreadDefaultBus(), wsdlUrl, null, port, SimpleEndpointImplFactory.getSingleton());
}
@@ -136,14 +135,14 @@
* @param port
* @param endpointImplFactory
*/
- public ClientImpl(Bus bus, URL wsdlUrl, QName service,
+ public ClientImpl(Bus bus, URL wsdlUrl, QName service,
QName port, EndpointImplFactory endpointImplFactory) {
this.bus = bus;
-
+
WSDLServiceFactory sf = (service == null)
? (new WSDLServiceFactory(bus, wsdlUrl)) : (new WSDLServiceFactory(bus, wsdlUrl, service));
Service svc = sf.create();
-
+
EndpointInfo epfo = findEndpoint(svc, port);
try {
@@ -153,17 +152,17 @@
}
notifyLifecycleManager();
}
-
+
public void destroy() {
-
+
// TODO: also inform the conduit so it can shutdown any response listeners
-
+
ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
if (null != mgr) {
mgr.clientDestroyed(this);
}
}
-
+
private void notifyLifecycleManager() {
ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
if (null != mgr) {
@@ -184,7 +183,7 @@
for (ServiceInfo svcfo : svc.getServiceInfos()) {
for (EndpointInfo e : svcfo.getEndpoints()) {
BindingInfo bfo = e.getBinding();
-
+
if (bfo.getBindingId().equals("http://schemas.xmlsoap.org/wsdl/soap/")) {
for (Object o : bfo.getExtensors().get()) {
if (o instanceof SOAPBindingImpl) {
@@ -195,7 +194,7 @@
}
}
}
-
+
}
}
}
@@ -212,21 +211,21 @@
return getConduitSelector().getEndpoint();
}
-
+
public Map<String, Object> getRequestContext() {
if (isThreadLocalRequestContext()) {
- if (null == requestContext.get()) {
- requestContext.set(new EchoContext(currentRequestContext));
+ if (!requestContext.containsKey(Thread.currentThread())) {
+ requestContext.put(Thread.currentThread(), new EchoContext(currentRequestContext));
}
- return requestContext.get();
+ return requestContext.get(Thread.currentThread());
}
return currentRequestContext;
}
public Map<String, Object> getResponseContext() {
- if (null == responseContext.get()) {
- responseContext.set(new HashMap<String, Object>());
- }
- return responseContext.get();
+ if (!responseContext.containsKey(Thread.currentThread())) {
+ responseContext.put(Thread.currentThread(), new HashMap<String, Object>());
+ }
+ return responseContext.get(Thread.currentThread());
}
public boolean isThreadLocalRequestContext() {
@@ -246,37 +245,37 @@
currentRequestContext.put(THREAD_LOCAL_REQUEST_CONTEXT, b);
}
-
+
public Object[] invoke(BindingOperationInfo oi, Object... params) throws Exception {
return invoke(oi, params, null);
}
public Object[] invoke(String operationName, Object... params) throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
-
+
return invoke(q, params);
}
-
+
public Object[] invoke(QName operationName, Object... params) throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
throw new UncheckedException(
new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
}
-
+
if (op.isUnwrappedCapable()) {
op = op.getUnwrappedOperation();
}
-
+
return invoke(op, params);
}
public Object[] invokeWrapped(String operationName, Object... params) throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
-
+
return invokeWrapped(q, params);
}
-
+
public Object[] invokeWrapped(QName operationName, Object... params) throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
@@ -287,7 +286,7 @@
}
public Object[] invoke(BindingOperationInfo oi,
- Object[] params,
+ Object[] params,
Exchange exchange) throws Exception {
Map<String, Object> context = new HashMap<String, Object>();
Map<String, Object> resp = getResponseContext();
@@ -298,11 +297,11 @@
try {
return invoke(oi, params, context, exchange);
} finally {
- responseContext.set(resp);
+ responseContext.put(Thread.currentThread(), resp);
}
}
public Object[] invoke(BindingOperationInfo oi,
- Object[] params,
+ Object[] params,
Map<String, Object> context) throws Exception {
try {
return invoke(oi, params, context, (Exchange)null);
@@ -310,46 +309,46 @@
if (context != null) {
Map<String, Object> resp = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT));
if (resp != null) {
- responseContext.set(resp);
+ responseContext.put(Thread.currentThread(), resp);
}
}
}
}
-
- public void invoke(ClientCallback callback,
- String operationName,
+
+ public void invoke(ClientCallback callback,
+ String operationName,
Object... params) throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
- invoke(callback, q, params);
+ invoke(callback, q, params);
}
- public void invoke(ClientCallback callback,
- QName operationName,
+ public void invoke(ClientCallback callback,
+ QName operationName,
Object... params) throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
throw new UncheckedException(
new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
}
-
+
if (op.isUnwrappedCapable()) {
op = op.getUnwrappedOperation();
}
-
+
invoke(callback, op, params);
}
- public void invokeWrapped(ClientCallback callback,
- String operationName,
+ public void invokeWrapped(ClientCallback callback,
+ String operationName,
Object... params)
throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
- invokeWrapped(callback, q, params);
+ invokeWrapped(callback, q, params);
}
- public void invokeWrapped(ClientCallback callback,
- QName operationName,
+ public void invokeWrapped(ClientCallback callback,
+ QName operationName,
Object... params)
throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
@@ -360,9 +359,9 @@
invoke(callback, op, params);
}
-
- public void invoke(ClientCallback callback,
- BindingOperationInfo oi,
+
+ public void invoke(ClientCallback callback,
+ BindingOperationInfo oi,
Object... params) throws Exception {
Bus origBus = BusFactory.getThreadDefaultBus(false);
BusFactory.setThreadDefaultBus(bus);
@@ -379,42 +378,42 @@
Message message = endpoint.getBinding().createMessage();
message.put(Message.INVOCATION_CONTEXT, context);
-
+
//setup the message context
setContext(reqContext, message);
setParameters(params, message);
-
+
if (null != reqContext) {
exchange.putAll(reqContext);
}
exchange.setOneWay(oi.getOutput() == null);
exchange.setOutMessage(message);
exchange.put(ClientCallback.class, callback);
-
+
setOutMessageProperties(message, oi);
setExchangeProperties(exchange, endpoint, oi);
-
+
// setup chain
-
+
PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
message.setInterceptorChain(chain);
-
+
modifyChain(chain, reqContext);
chain.setFaultObserver(outFaultObserver);
-
+
// setup conduit selector
prepareConduitSelector(message);
-
- // execute chain
+
+ // execute chain
chain.doIntercept(message);
} finally {
BusFactory.setThreadDefaultBus(origBus);
- }
+ }
}
-
+
public Object[] invoke(BindingOperationInfo oi,
- Object[] params,
+ Object[] params,
Map<String, Object> context,
Exchange exchange) throws Exception {
Bus origBus = BusFactory.getThreadDefaultBus(false);
@@ -425,7 +424,7 @@
}
exchange.setSynchronous(true);
Endpoint endpoint = getEndpoint();
-
+
Map<String, Object> reqContext = null;
Map<String, Object> resContext = null;
if (LOG.isLoggable(Level.FINE)) {
@@ -436,46 +435,46 @@
reqContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT));
resContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT));
message.put(Message.INVOCATION_CONTEXT, context);
- }
+ }
//setup the message context
setContext(reqContext, message);
setParameters(params, message);
-
+
if (null != reqContext) {
exchange.putAll(reqContext);
}
-
+
if (null != oi) {
exchange.setOneWay(oi.getOutput() == null);
}
-
+
exchange.setOutMessage(message);
-
+
setOutMessageProperties(message, oi);
setExchangeProperties(exchange, endpoint, oi);
-
+
// setup chain
-
+
PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
message.setInterceptorChain(chain);
-
+
modifyChain(chain, reqContext);
chain.setFaultObserver(outFaultObserver);
-
+
// setup conduit selector
prepareConduitSelector(message);
-
- // execute chain
+
+ // execute chain
chain.doIntercept(message);
-
+
return processResult(message, exchange, oi, resContext);
-
+
} finally {
BusFactory.setThreadDefaultBus(origBus);
}
}
- protected Object[] processResult(Message message,
+ protected Object[] processResult(Message message,
Exchange exchange,
BindingOperationInfo oi,
Map<String, Object> resContext) throws Exception {
@@ -496,14 +495,14 @@
}
throw ex;
}
-
+
// Wait for a response if we need to
if (oi != null && !oi.getOperationInfo().isOneWay()) {
synchronized (exchange) {
waitResponse(exchange);
}
}
-
+
// leave the input stream open for the caller
Boolean keepConduitAlive = (Boolean)exchange.get(Client.KEEP_CONDUIT_ALIVE);
if (keepConduitAlive == null || !keepConduitAlive) {
@@ -514,7 +513,7 @@
List resList = null;
Message inMsg = exchange.getInMessage();
if (inMsg != null) {
- if (null != resContext) {
+ if (null != resContext) {
resContext.putAll(inMsg);
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("set responseContext to be" + responseContext);
@@ -522,18 +521,18 @@
}
resList = inMsg.getContent(List.class);
}
-
+
// check for an incoming fault
ex = getException(exchange);
-
+
if (ex != null) {
throw ex;
}
-
+
if (resList != null) {
return resList.toArray();
}
-
+
return null;
}
protected Exception getException(Exchange exchange) {
@@ -541,17 +540,17 @@
return exchange.getInFaultMessage().getContent(Exception.class);
} else if (exchange.getOutFaultMessage() != null) {
return exchange.getOutFaultMessage().getContent(Exception.class);
- }
+ }
return null;
}
protected void setContext(Map<String, Object> ctx, Message message) {
- if (ctx != null) {
+ if (ctx != null) {
message.putAll(ctx);
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("set requestContext to message be" + ctx);
}
- }
+ }
}
protected void waitResponse(Exchange exchange) {
@@ -576,24 +575,24 @@
MessageContentsList contents = new MessageContentsList(params);
message.setContent(List.class, contents);
}
-
+
public void onMessage(Message message) {
Endpoint endpoint = message.getExchange().get(Endpoint.class);
if (endpoint == null) {
// in this case correlation will occur outside the transport,
- // however there's a possibility that the endpoint may have been
+ // however there's a possibility that the endpoint may have been
// rebased in the meantime, so that the response will be mediated
// via a set of in interceptors provided by a *different* endpoint
//
endpoint = getConduitSelector().getEndpoint();
- message.getExchange().put(Endpoint.class, endpoint);
+ message.getExchange().put(Endpoint.class, endpoint);
}
message = endpoint.getBinding().createMessage(message);
message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
PhaseManager pm = bus.getExtension(PhaseManager.class);
-
+
List<Interceptor> i1 = bus.getInInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by bus: " + i1);
@@ -610,12 +609,12 @@
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by binding: " + i4);
}
-
- PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4);
+
+ PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4);
message.setInterceptorChain(chain);
-
+
chain.setFaultObserver(outFaultObserver);
-
+
Bus origBus = BusFactory.getThreadDefaultBus(false);
BusFactory.setThreadDefaultBus(bus);
// execute chain
@@ -628,7 +627,7 @@
}
callback.start(message);
}
-
+
String startingAfterInterceptorID = (String) message.get(
PhaseInterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
String startingInterceptorID = (String) message.get(
@@ -642,9 +641,9 @@
} else {
chain.doIntercept(message);
}
-
+
callback = message.getExchange().get(ClientCallback.class);
-
+
if (callback != null && !isPartialResponse(message)) {
message.getExchange().setInMessage(message);
Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message
@@ -652,11 +651,11 @@
.getOutMessage()
.get(Message.INVOCATION_CONTEXT));
resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
-
+
try {
Object obj[] = processResult(message, message.getExchange(),
null, resCtx);
-
+
callback.handleResponse(resCtx, obj);
} catch (Throwable ex) {
callback.handleException(resCtx, ex);
@@ -667,7 +666,7 @@
if (!isPartialResponse(message) && callback == null) {
message.getExchange().put(FINISHED, Boolean.TRUE);
message.getExchange().setInMessage(message);
- message.getExchange().notifyAll();
+ message.getExchange().notifyAll();
}
}
BusFactory.setThreadDefaultBus(origBus);
@@ -695,7 +694,7 @@
message.put(MessageInfo.class, boi.getOperationInfo().getInput());
}
}
-
+
protected void setExchangeProperties(Exchange exchange,
Endpoint endpoint,
BindingOperationInfo boi) {
@@ -713,7 +712,7 @@
exchange.put(BindingOperationInfo.class, boi);
exchange.put(OperationInfo.class, boi.getOperationInfo());
}
-
+
if (exchange.isSynchronous() || executor == null) {
exchange.put(MessageObserver.class, this);
} else {
@@ -725,7 +724,7 @@
}
});
}
- });
+ });
}
exchange.put(Retryable.class, this);
exchange.put(Client.class, this);
@@ -758,10 +757,10 @@
}
}
- protected PhaseInterceptorChain setupInterceptorChain(Endpoint endpoint) {
+ protected PhaseInterceptorChain setupInterceptorChain(Endpoint endpoint) {
PhaseManager pm = bus.getExtension(PhaseManager.class);
-
+
List<Interceptor> i1 = bus.getOutInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by bus: " + i1);
@@ -796,17 +795,17 @@
public void setSynchronousTimeout(int synchronousTimeout) {
this.synchronousTimeout = synchronousTimeout;
}
-
+
public final ConduitSelector getConduitSelector() {
return getConduitSelector(null);
}
-
+
protected final synchronized ConduitSelector getConduitSelector(
ConduitSelector override
) {
if (null == conduitSelector) {
setConduitSelector(override != null
- ? override
+ ? override
: new UpfrontConduitSelector());
}
return conduitSelector;
@@ -819,13 +818,13 @@
private boolean isPartialResponse(Message in) {
return Boolean.TRUE.equals(in.get(Message.PARTIAL_RESPONSE_MESSAGE));
}
-
-
+
+
/*
* modification are echoed back to the shared map
*/
public static class EchoContext extends HashMap<String, Object> {
- final Map<String, Object> shared;
+ final Map<String, Object> shared;
public EchoContext(Map<String, Object> sharedMap) {
super(sharedMap);
shared = sharedMap;
@@ -840,12 +839,12 @@
shared.putAll(t);
super.putAll(t);
}
-
+
public Object remove(Object key) {
shared.remove(key);
return super.remove(key);
}
-
+
public void reload() {
super.clear();
super.putAll(shared);