You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by bh...@apache.org on 2009/05/22 13:32:34 UTC

svn commit: r777481 - /cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java

Author: bharath
Date: Fri May 22 11:32:33 2009
New Revision: 777481

URL: http://svn.apache.org/viewvc?rev=777481&view=rev
Log:
Fix for CXF-2164. 
As discussed on the Jira, we now maintain a WeakHashMap for the request and response contexts. (With the Thread as the key). This would mean the contexts would get GC's when the ClientImpl gets GC'd (during undeployment).

Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=777481&r1=777480&r2=777481&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Fri May 22 11:32:33 2009
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.logging.Level;
@@ -68,29 +69,27 @@
 public class ClientImpl
     extends AbstractBasicInterceptorProvider
     implements Client, Retryable, MessageObserver {
-    
+
     public static final String THREAD_LOCAL_REQUEST_CONTEXT = "thread.local.request.context";
 
-    
+
     public static final String FINISHED = "exchange.finished";
-    
+
     private static final Logger LOG = LogUtils.getL7dLogger(ClientImpl.class);
-    
+
     protected Bus bus;
     protected ConduitSelector conduitSelector;
-    protected ClientOutFaultObserver outFaultObserver; 
+    protected ClientOutFaultObserver outFaultObserver;
     protected int synchronousTimeout = 60000; // default 60 second timeout
-    
+
     protected PhaseChainCache outboundChainCache = new PhaseChainCache();
     protected PhaseChainCache inboundChainCache = new PhaseChainCache();
-    
+
     protected Map<String, Object> currentRequestContext = new ConcurrentHashMap<String, Object>();
-    protected ThreadLocal <EchoContext> requestContext =
-        new ThreadLocal<EchoContext>();
+    protected Map<Thread, EchoContext> requestContext = new WeakHashMap<Thread, EchoContext>();
+
+    protected Map<Thread, Map<String, Object>> responseContext = new WeakHashMap<Thread, Map<String, Object>>();
 
-    protected ThreadLocal <Map<String, Object>> responseContext =
-            new ThreadLocal<Map<String, Object>>();
-    
     protected Executor executor;
 
 
@@ -101,7 +100,7 @@
     public ClientImpl(Bus b, Endpoint e, Conduit c) {
        this(b, e, new PreexistingConduitSelector(c));
     }
-    
+
     public ClientImpl(Bus b, Endpoint e, ConduitSelector sc) {
         bus = b;
         outFaultObserver = new ClientOutFaultObserver(bus);
@@ -112,7 +111,7 @@
     public ClientImpl(URL wsdlUrl) {
         this(BusFactory.getThreadDefaultBus(), wsdlUrl, null, null, SimpleEndpointImplFactory.getSingleton());
     }
-    
+
     public ClientImpl(URL wsdlUrl, QName port) {
         this(BusFactory.getThreadDefaultBus(), wsdlUrl, null, port, SimpleEndpointImplFactory.getSingleton());
     }
@@ -136,14 +135,14 @@
      * @param port
      * @param endpointImplFactory
      */
-    public ClientImpl(Bus bus, URL wsdlUrl, QName service, 
+    public ClientImpl(Bus bus, URL wsdlUrl, QName service,
                       QName port, EndpointImplFactory endpointImplFactory) {
         this.bus = bus;
-        
+
         WSDLServiceFactory sf = (service == null)
             ? (new WSDLServiceFactory(bus, wsdlUrl)) : (new WSDLServiceFactory(bus, wsdlUrl, service));
         Service svc = sf.create();
-    
+
         EndpointInfo epfo = findEndpoint(svc, port);
 
         try {
@@ -153,17 +152,17 @@
         }
         notifyLifecycleManager();
     }
-    
+
     public void destroy() {
-        
+
         // TODO: also inform the conduit so it can shutdown any response listeners
-        
+
         ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
         if (null != mgr) {
             mgr.clientDestroyed(this);
         }
     }
-    
+
     private void notifyLifecycleManager() {
         ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
         if (null != mgr) {
@@ -184,7 +183,7 @@
             for (ServiceInfo svcfo : svc.getServiceInfos()) {
                 for (EndpointInfo e : svcfo.getEndpoints()) {
                     BindingInfo bfo = e.getBinding();
-    
+
                     if (bfo.getBindingId().equals("http://schemas.xmlsoap.org/wsdl/soap/")) {
                         for (Object o : bfo.getExtensors().get()) {
                             if (o instanceof SOAPBindingImpl) {
@@ -195,7 +194,7 @@
                                 }
                             }
                         }
-    
+
                     }
                 }
             }
@@ -212,21 +211,21 @@
         return getConduitSelector().getEndpoint();
     }
 
-    
+
     public Map<String, Object> getRequestContext() {
         if (isThreadLocalRequestContext()) {
-            if (null == requestContext.get()) {
-                requestContext.set(new EchoContext(currentRequestContext));
+            if (!requestContext.containsKey(Thread.currentThread())) {
+                requestContext.put(Thread.currentThread(), new EchoContext(currentRequestContext));
             }
-            return requestContext.get();
+            return requestContext.get(Thread.currentThread());
         }
         return currentRequestContext;
     }
     public Map<String, Object> getResponseContext() {
-        if (null == responseContext.get()) {
-            responseContext.set(new HashMap<String, Object>());
-        }        
-        return responseContext.get();
+        if (!responseContext.containsKey(Thread.currentThread())) {
+            responseContext.put(Thread.currentThread(), new HashMap<String, Object>());
+        }
+        return responseContext.get(Thread.currentThread());
 
     }
     public boolean isThreadLocalRequestContext() {
@@ -246,37 +245,37 @@
         currentRequestContext.put(THREAD_LOCAL_REQUEST_CONTEXT, b);
     }
 
-    
+
     public Object[] invoke(BindingOperationInfo oi, Object... params) throws Exception {
         return invoke(oi, params, null);
     }
 
     public Object[] invoke(String operationName, Object... params) throws Exception {
         QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
-       
+
         return invoke(q, params);
     }
-    
+
     public Object[] invoke(QName operationName, Object... params) throws Exception {
         BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
         if (op == null) {
             throw new UncheckedException(
                 new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
         }
-        
+
         if (op.isUnwrappedCapable()) {
             op = op.getUnwrappedOperation();
         }
-        
+
         return invoke(op, params);
     }
 
     public Object[] invokeWrapped(String operationName, Object... params) throws Exception {
         QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
-       
+
         return invokeWrapped(q, params);
     }
-    
+
     public Object[] invokeWrapped(QName operationName, Object... params) throws Exception {
         BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
         if (op == null) {
@@ -287,7 +286,7 @@
     }
 
     public Object[] invoke(BindingOperationInfo oi,
-                           Object[] params, 
+                           Object[] params,
                            Exchange exchange) throws Exception {
         Map<String, Object> context = new HashMap<String, Object>();
         Map<String, Object> resp = getResponseContext();
@@ -298,11 +297,11 @@
         try {
             return invoke(oi, params, context, exchange);
         } finally {
-            responseContext.set(resp);
+            responseContext.put(Thread.currentThread(), resp);
         }
     }
     public Object[] invoke(BindingOperationInfo oi,
-                           Object[] params, 
+                           Object[] params,
                            Map<String, Object> context) throws Exception {
         try {
             return invoke(oi, params, context, (Exchange)null);
@@ -310,46 +309,46 @@
             if (context != null) {
                 Map<String, Object> resp = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT));
                 if (resp != null) {
-                    responseContext.set(resp);
+                    responseContext.put(Thread.currentThread(), resp);
                 }
             }
         }
     }
-    
-    public void invoke(ClientCallback callback, 
-                       String operationName, 
+
+    public void invoke(ClientCallback callback,
+                       String operationName,
                        Object... params) throws Exception {
         QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
-        invoke(callback, q, params);        
+        invoke(callback, q, params);
     }
 
-    public void invoke(ClientCallback callback, 
-                       QName operationName, 
+    public void invoke(ClientCallback callback,
+                       QName operationName,
                        Object... params) throws Exception {
         BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
         if (op == null) {
             throw new UncheckedException(
                 new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
         }
-        
+
         if (op.isUnwrappedCapable()) {
             op = op.getUnwrappedOperation();
         }
-        
+
         invoke(callback, op, params);
     }
 
 
-    public void invokeWrapped(ClientCallback callback, 
-                              String operationName, 
+    public void invokeWrapped(ClientCallback callback,
+                              String operationName,
                               Object... params)
         throws Exception {
         QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
-        invokeWrapped(callback, q, params);        
+        invokeWrapped(callback, q, params);
     }
 
-    public void invokeWrapped(ClientCallback callback, 
-                              QName operationName, 
+    public void invokeWrapped(ClientCallback callback,
+                              QName operationName,
                               Object... params)
         throws Exception {
         BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
@@ -360,9 +359,9 @@
         invoke(callback, op, params);
     }
 
-    
-    public void invoke(ClientCallback callback, 
-                       BindingOperationInfo oi, 
+
+    public void invoke(ClientCallback callback,
+                       BindingOperationInfo oi,
                        Object... params) throws Exception {
         Bus origBus = BusFactory.getThreadDefaultBus(false);
         BusFactory.setThreadDefaultBus(bus);
@@ -379,42 +378,42 @@
 
             Message message = endpoint.getBinding().createMessage();
             message.put(Message.INVOCATION_CONTEXT, context);
-            
+
             //setup the message context
             setContext(reqContext, message);
             setParameters(params, message);
-    
+
             if (null != reqContext) {
                 exchange.putAll(reqContext);
             }
             exchange.setOneWay(oi.getOutput() == null);
             exchange.setOutMessage(message);
             exchange.put(ClientCallback.class, callback);
-            
+
             setOutMessageProperties(message, oi);
             setExchangeProperties(exchange, endpoint, oi);
-            
+
             // setup chain
-    
+
             PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
             message.setInterceptorChain(chain);
-            
+
             modifyChain(chain, reqContext);
             chain.setFaultObserver(outFaultObserver);
-            
+
             // setup conduit selector
             prepareConduitSelector(message);
-            
-            // execute chain        
+
+            // execute chain
             chain.doIntercept(message);
 
         } finally {
             BusFactory.setThreadDefaultBus(origBus);
-        }       
+        }
     }
-    
+
     public Object[] invoke(BindingOperationInfo oi,
-                           Object[] params, 
+                           Object[] params,
                            Map<String, Object> context,
                            Exchange exchange) throws Exception {
         Bus origBus = BusFactory.getThreadDefaultBus(false);
@@ -425,7 +424,7 @@
             }
             exchange.setSynchronous(true);
             Endpoint endpoint = getEndpoint();
-            
+
             Map<String, Object> reqContext = null;
             Map<String, Object> resContext = null;
             if (LOG.isLoggable(Level.FINE)) {
@@ -436,46 +435,46 @@
                 reqContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT));
                 resContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT));
                 message.put(Message.INVOCATION_CONTEXT, context);
-            }    
+            }
             //setup the message context
             setContext(reqContext, message);
             setParameters(params, message);
-    
+
             if (null != reqContext) {
                 exchange.putAll(reqContext);
             }
-            
+
             if (null != oi) {
                 exchange.setOneWay(oi.getOutput() == null);
             }
-    
+
             exchange.setOutMessage(message);
-            
+
             setOutMessageProperties(message, oi);
             setExchangeProperties(exchange, endpoint, oi);
-            
+
             // setup chain
-    
+
             PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
             message.setInterceptorChain(chain);
-            
+
             modifyChain(chain, reqContext);
             chain.setFaultObserver(outFaultObserver);
-            
+
             // setup conduit selector
             prepareConduitSelector(message);
-            
-            // execute chain        
+
+            // execute chain
             chain.doIntercept(message);
-    
+
             return processResult(message, exchange, oi, resContext);
-            
+
         } finally {
             BusFactory.setThreadDefaultBus(origBus);
         }
     }
 
-    protected Object[] processResult(Message message, 
+    protected Object[] processResult(Message message,
                                    Exchange exchange,
                                    BindingOperationInfo oi,
                                    Map<String, Object> resContext) throws Exception {
@@ -496,14 +495,14 @@
             }
             throw ex;
         }
-        
+
         // Wait for a response if we need to
         if (oi != null && !oi.getOperationInfo().isOneWay()) {
             synchronized (exchange) {
                 waitResponse(exchange);
             }
         }
-        
+
         // leave the input stream open for the caller
         Boolean keepConduitAlive = (Boolean)exchange.get(Client.KEEP_CONDUIT_ALIVE);
         if (keepConduitAlive == null || !keepConduitAlive) {
@@ -514,7 +513,7 @@
         List resList = null;
         Message inMsg = exchange.getInMessage();
         if (inMsg != null) {
-            if (null != resContext) {                   
+            if (null != resContext) {
                 resContext.putAll(inMsg);
                 if (LOG.isLoggable(Level.FINE)) {
                     LOG.fine("set responseContext to be" + responseContext);
@@ -522,18 +521,18 @@
             }
             resList = inMsg.getContent(List.class);
         }
-        
+
         // check for an incoming fault
         ex = getException(exchange);
-        
+
         if (ex != null) {
             throw ex;
         }
-        
+
         if (resList != null) {
             return resList.toArray();
         }
-        
+
         return null;
     }
     protected Exception getException(Exchange exchange) {
@@ -541,17 +540,17 @@
             return exchange.getInFaultMessage().getContent(Exception.class);
         } else if (exchange.getOutFaultMessage() != null) {
             return exchange.getOutFaultMessage().getContent(Exception.class);
-        } 
+        }
         return null;
     }
 
     protected void setContext(Map<String, Object> ctx, Message message) {
-        if (ctx != null) {            
+        if (ctx != null) {
             message.putAll(ctx);
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("set requestContext to message be" + ctx);
             }
-        }        
+        }
     }
 
     protected void waitResponse(Exchange exchange) {
@@ -576,24 +575,24 @@
         MessageContentsList contents = new MessageContentsList(params);
         message.setContent(List.class, contents);
     }
-    
+
     public void onMessage(Message message) {
 
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
         if (endpoint == null) {
             // in this case correlation will occur outside the transport,
-            // however there's a possibility that the endpoint may have been 
+            // however there's a possibility that the endpoint may have been
             // rebased in the meantime, so that the response will be mediated
             // via a set of in interceptors provided by a *different* endpoint
             //
             endpoint = getConduitSelector().getEndpoint();
-            message.getExchange().put(Endpoint.class, endpoint);            
+            message.getExchange().put(Endpoint.class, endpoint);
         }
         message = endpoint.getBinding().createMessage(message);
         message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
         message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
         PhaseManager pm = bus.getExtension(PhaseManager.class);
-        
+
         List<Interceptor> i1 = bus.getInInterceptors();
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("Interceptors contributed by bus: " + i1);
@@ -610,12 +609,12 @@
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("Interceptors contributed by binding: " + i4);
         }
-        
-        PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4); 
+
+        PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4);
         message.setInterceptorChain(chain);
-        
+
         chain.setFaultObserver(outFaultObserver);
-        
+
         Bus origBus = BusFactory.getThreadDefaultBus(false);
         BusFactory.setThreadDefaultBus(bus);
         // execute chain
@@ -628,7 +627,7 @@
                 }
                 callback.start(message);
             }
-            
+
             String startingAfterInterceptorID = (String) message.get(
                 PhaseInterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
             String startingInterceptorID = (String) message.get(
@@ -642,9 +641,9 @@
             } else {
                 chain.doIntercept(message);
             }
-            
+
             callback = message.getExchange().get(ClientCallback.class);
-            
+
             if (callback != null && !isPartialResponse(message)) {
                 message.getExchange().setInMessage(message);
                 Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message
@@ -652,11 +651,11 @@
                                                                 .getOutMessage()
                                                                 .get(Message.INVOCATION_CONTEXT));
                 resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
-                
+
                 try {
                     Object obj[] = processResult(message, message.getExchange(),
                                                  null, resCtx);
-                                        
+
                     callback.handleResponse(resCtx, obj);
                 } catch (Throwable ex) {
                     callback.handleException(resCtx, ex);
@@ -667,7 +666,7 @@
                 if (!isPartialResponse(message) && callback == null) {
                     message.getExchange().put(FINISHED, Boolean.TRUE);
                     message.getExchange().setInMessage(message);
-                    message.getExchange().notifyAll();                   
+                    message.getExchange().notifyAll();
                 }
             }
             BusFactory.setThreadDefaultBus(origBus);
@@ -695,7 +694,7 @@
             message.put(MessageInfo.class, boi.getOperationInfo().getInput());
         }
     }
-    
+
     protected void setExchangeProperties(Exchange exchange,
                                          Endpoint endpoint,
                                          BindingOperationInfo boi) {
@@ -713,7 +712,7 @@
             exchange.put(BindingOperationInfo.class, boi);
             exchange.put(OperationInfo.class, boi.getOperationInfo());
         }
-                
+
         if (exchange.isSynchronous() || executor == null) {
             exchange.put(MessageObserver.class, this);
         } else {
@@ -725,7 +724,7 @@
                         }
                     });
                 }
-            });            
+            });
         }
         exchange.put(Retryable.class, this);
         exchange.put(Client.class, this);
@@ -758,10 +757,10 @@
         }
     }
 
-    protected PhaseInterceptorChain setupInterceptorChain(Endpoint endpoint) { 
+    protected PhaseInterceptorChain setupInterceptorChain(Endpoint endpoint) {
 
         PhaseManager pm = bus.getExtension(PhaseManager.class);
-        
+
         List<Interceptor> i1 = bus.getOutInterceptors();
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("Interceptors contributed by bus: " + i1);
@@ -796,17 +795,17 @@
     public void setSynchronousTimeout(int synchronousTimeout) {
         this.synchronousTimeout = synchronousTimeout;
     }
-    
+
     public final ConduitSelector getConduitSelector() {
         return getConduitSelector(null);
     }
-    
+
     protected final synchronized ConduitSelector getConduitSelector(
         ConduitSelector override
     ) {
         if (null == conduitSelector) {
             setConduitSelector(override != null
-                               ? override 
+                               ? override
                                : new UpfrontConduitSelector());
         }
         return conduitSelector;
@@ -819,13 +818,13 @@
     private boolean isPartialResponse(Message in) {
         return Boolean.TRUE.equals(in.get(Message.PARTIAL_RESPONSE_MESSAGE));
     }
-    
-    
+
+
     /*
      * modification are echoed back to the shared map
      */
     public static class EchoContext extends HashMap<String, Object> {
-        final Map<String, Object> shared; 
+        final Map<String, Object> shared;
         public EchoContext(Map<String, Object> sharedMap) {
             super(sharedMap);
             shared = sharedMap;
@@ -840,12 +839,12 @@
             shared.putAll(t);
             super.putAll(t);
         }
-        
+
         public Object remove(Object key) {
             shared.remove(key);
             return super.remove(key);
         }
-        
+
         public void reload() {
             super.clear();
             super.putAll(shared);