You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2018/04/27 16:32:09 UTC

[cxf] branch master updated: [CXF-7591, CXF-7710] More updates to the response context handling

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 6abad18  [CXF-7591, CXF-7710] More updates to the response context handling
6abad18 is described below

commit 6abad1864187457984b0dff2622eafb4b5df5305
Author: Daniel Kulp <dk...@apache.org>
AuthorDate: Fri Apr 27 12:31:09 2018 -0400

    [CXF-7591, CXF-7710] More updates to the response context handling
---
 .../main/java/org/apache/cxf/endpoint/Client.java  |  28 ++++
 .../java/org/apache/cxf/endpoint/ClientImpl.java   | 185 +++++++++++++--------
 2 files changed, 147 insertions(+), 66 deletions(-)

diff --git a/core/src/main/java/org/apache/cxf/endpoint/Client.java b/core/src/main/java/org/apache/cxf/endpoint/Client.java
index 27ec1b3..c06b7f6 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/Client.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/Client.java
@@ -239,6 +239,34 @@ public interface Client extends InterceptorProvider, MessageObserver, ConduitSel
      * @return true if the request context is a thread local
      */
     boolean isThreadLocalRequestContext();
+    
+    
+    /**
+     * Wrappers the contexts in a way that allows the contexts
+     * to be cleared and released in an try-with-resources block
+     */
+    interface Contexts extends AutoCloseable {
+        Map<String, Object> getRequestContext();
+        Map<String, Object> getResponseContext();
+    }
+    
+    default Contexts getContexts() {
+        return new Contexts() {
+            @Override
+            public void close() throws Exception {
+                getRequestContext().clear();
+                getResponseContext().clear();
+            }
+            @Override
+            public Map<String, Object> getRequestContext() {
+                return Client.this.getRequestContext();
+            }
+            @Override
+            public Map<String, Object> getResponseContext() {
+                return Client.this.getResponseContext();
+            }
+        };
+    }
 
 
     Endpoint getEndpoint();
diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
index 6beecac..799c26e 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
@@ -96,11 +96,11 @@ public class ClientImpl
 
     protected Map<String, Object> currentRequestContext = new ConcurrentHashMap<String, Object>(8, 0.75f, 4);
     protected Thread latestContextThread;
-    protected Map<Thread, Map<String, Object>> requestContext
-        = Collections.synchronizedMap(new WeakHashMap<Thread, Map<String, Object>>());
+    protected Map<Thread, EchoContext> requestContext
+        = Collections.synchronizedMap(new WeakHashMap<Thread, EchoContext>());
 
-    protected Map<Thread, Map<String, Object>> responseContext
-        = Collections.synchronizedMap(new WeakHashMap<Thread, Map<String, Object>>());
+    protected Map<Thread, ResponseContext> responseContext
+        = Collections.synchronizedMap(new WeakHashMap<Thread, ResponseContext>());
 
     protected Executor executor;
 
@@ -237,12 +237,33 @@ public class ClientImpl
         return getConduitSelector().getEndpoint();
     }
 
-
+    public void releaseThreadContexts() {
+        final Thread t = Thread.currentThread();
+        requestContext.remove(t);
+        responseContext.remove(t);
+    }
+    
+    public Contexts getContexts() {
+        return new Contexts() {
+            @Override
+            public void close() throws Exception {
+                releaseThreadContexts();
+            }
+            @Override
+            public Map<String, Object> getRequestContext() {
+                return ClientImpl.this.getRequestContext();
+            }
+            @Override
+            public Map<String, Object> getResponseContext() {
+                return ClientImpl.this.getResponseContext();
+            }            
+        };
+    }
     public Map<String, Object> getRequestContext() {
         if (isThreadLocalRequestContext()) {
             final Thread t = Thread.currentThread();
             if (!requestContext.containsKey(t)) {
-                Map<String, Object> freshRequestContext = new EchoContext(currentRequestContext);
+                EchoContext freshRequestContext = new EchoContext(currentRequestContext, requestContext);
                 requestContext.put(t, freshRequestContext);
             }
             latestContextThread = t;
@@ -251,28 +272,30 @@ public class ClientImpl
         return currentRequestContext;
     }
     public Map<String, Object> getResponseContext() {
-        if (!responseContext.containsKey(Thread.currentThread())) {
-            final Thread t = Thread.currentThread();
-            responseContext.put(t, new HashMap<String, Object>() {
-                private static final long serialVersionUID = 1L;
-                @Override
-                public void clear() {
-                    super.clear();
-                    try {
-                        for (Map.Entry<Thread, Map<String, Object>> ent : responseContext.entrySet()) {
-                            if (ent.getValue() == this) {
-                                responseContext.remove(ent.getKey());
-                                return;
-                            }
-                        }
-                    } catch (Throwable t) {
-                        //ignore
-                    }
-                }
-            });
+        final Thread t = Thread.currentThread();
+        ResponseContext ret = responseContext.get(t);
+        if (ret == null) {
+            ret = new ResponseContext();
+            responseContext.put(t, ret);
         }
-        return responseContext.get(Thread.currentThread());
-
+        return ret;
+    }
+    protected Map<String, Object> newResponseContext() {
+        final Thread t = Thread.currentThread();
+        ResponseContext ret = new ResponseContext();
+        responseContext.put(t, ret);
+        return ret;
+    }
+    protected Map<String, Object> reloadResponseContext(Map<String, Object> o) {
+        final Thread t = Thread.currentThread();
+        ResponseContext ctx = responseContext.get(t);
+        if (ctx == null) {
+            ctx = new ResponseContext(o);
+            responseContext.put(t, ctx);
+        } else if (o != ctx) {
+            ctx.reload(o);
+        }
+        return ctx;
     }
     public boolean isThreadLocalRequestContext() {
         Object o = currentRequestContext.get(THREAD_LOCAL_REQUEST_CONTEXT);
@@ -335,31 +358,12 @@ public class ClientImpl
                            Object[] params,
                            Exchange exchange) throws Exception {
         Map<String, Object> context = new HashMap<>();
-        Map<String, Object> resp = new HashMap<>();
-        Map<String, Object> req = new HashMap<>(getRequestContext());
-        context.put(RESPONSE_CONTEXT, resp);
-        context.put(REQUEST_CONTEXT, req);
-        try {
-            return invoke(oi, params, context, exchange);
-        } finally {
-            if (responseContext != null) {
-                responseContext.put(Thread.currentThread(), resp);
-            }
-        }
+        return invoke(oi, params, context, exchange);
     }
     public Object[] invoke(BindingOperationInfo oi,
                            Object[] params,
                            Map<String, Object> context) throws Exception {
-        try {
-            return invoke(oi, params, context, (Exchange)null);
-        } finally {
-            if (context != null) {
-                Map<String, Object> resp = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT));
-                if (resp != null && responseContext != null) {
-                    responseContext.put(Thread.currentThread(), resp);
-                }
-            }
-        }
+        return invoke(oi, params, context, (Exchange)null);
     }
 
     public void invoke(ClientCallback callback,
@@ -449,6 +453,7 @@ public class ClientImpl
                               Exchange exchange) throws Exception {
         Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
         ClassLoaderHolder origLoader = null;
+        Map<String, Object> resContext = null;
         try {
             ClassLoader loader = bus.getExtension(ClassLoader.class);
             if (loader != null) {
@@ -467,7 +472,6 @@ public class ClientImpl
             // Make sure INVOCATION CONTEXT, REQUEST_CONTEXT and RESPONSE_CONTEXT are present
             // on message
             Map<String, Object> reqContext = null;
-            Map<String, Object> resContext = null;
             if (context == null) {
                 context = new HashMap<>();
             }
@@ -478,7 +482,7 @@ public class ClientImpl
                 context.put(REQUEST_CONTEXT, reqContext);
             }
             if (resContext == null) {
-                resContext = new HashMap<>();
+                resContext = newResponseContext();
                 context.put(RESPONSE_CONTEXT, resContext);
             }
 
@@ -514,7 +518,7 @@ public class ClientImpl
                                 // handle the right response
                                 List<Object> resList = null;
                                 Message inMsg = message.getExchange().getInMessage();
-                                Map<String, Object> ctx = responseContext.get(Thread.currentThread());
+                                Map<String, Object> ctx = getResponseContext();
                                 resList = CastUtils.cast(inMsg.getContent(List.class));
                                 Object[] result = resList == null ? null : resList.toArray();
                                 callback.handleResponse(ctx, result);
@@ -541,6 +545,9 @@ public class ClientImpl
             }
             return processResult(message, exchange, oi, resContext);
         } finally {
+            if (callback == null) {
+                reloadResponseContext(resContext);
+            }
             if (origLoader != null) {
                 origLoader.reset();
             }
@@ -639,7 +646,7 @@ public class ClientImpl
                 resContext.putAll(inMsg);
                 // remove the recursive reference if present
                 resContext.remove(Message.INVOCATION_CONTEXT);
-                responseContext.put(Thread.currentThread(), resContext);
+                reloadResponseContext(resContext);
             }
             resList = CastUtils.cast(inMsg.getContent(List.class));
         }
@@ -814,9 +821,8 @@ public class ClientImpl
                                                 Message.INVOCATION_CONTEXT));
                         resCtx = CastUtils.cast((Map<?, ?>) resCtx
                                 .get(RESPONSE_CONTEXT));
-                        if (resCtx != null) {
-                            responseContext.put(Thread.currentThread(), resCtx);
-                        }
+                        resCtx = reloadResponseContext(resCtx);
+
                         // remove callback so that it won't be invoked twice
                         callback = message.getExchange().remove(ClientCallback.class);
                         if (callback != null) {
@@ -843,15 +849,14 @@ public class ClientImpl
                                                                 .getOutMessage()
                                                                 .get(Message.INVOCATION_CONTEXT));
                 resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
-                if (resCtx != null && responseContext != null) {
-                    responseContext.put(Thread.currentThread(), resCtx);
-                }
                 try {
                     Object obj[] = processResult(message, message.getExchange(),
                                                  null, resCtx);
 
+                    resCtx = reloadResponseContext(resCtx);
                     callback.handleResponse(resCtx, obj);
                 } catch (Throwable ex) {
+                    resCtx = reloadResponseContext(resCtx);
                     callback.handleException(resCtx, ex);
                 }
             }
@@ -1052,28 +1057,40 @@ public class ClientImpl
     }
 
 
-    /*
-     * modification are echoed back to the shared map
-     */
     public class EchoContext extends ConcurrentHashMap<String, Object> {
         private static final long serialVersionUID = 1L;
-        public EchoContext(Map<String, Object> sharedMap) {
+        
+        final Map<Thread, EchoContext> context;
+        public EchoContext(Map<String, Object> sharedMap, Map<Thread, EchoContext> ctx) {
+            super(8, 0.75f, 4);
+            if (sharedMap != null) {
+                super.putAll(sharedMap);
+            }
+            context = ctx;
+        }
+
+        public EchoContext(Map<Thread, EchoContext> ctx) {
             super(8, 0.75f, 4);
-            putAll(sharedMap);
+            context = ctx;
         }
 
         public void reload() {
+            reload(context.get(latestContextThread));
+        }
+        public void reload(Map<String, Object> content) {
             super.clear();
-            super.putAll(requestContext.get(latestContextThread));
+            if (content != null) {
+                putAll(content);
+            }
         }
         
         @Override
         public void clear() {
             super.clear();
             try {
-                for (Map.Entry<Thread, Map<String, Object>> ent : requestContext.entrySet()) {
+                for (Map.Entry<Thread, EchoContext> ent : context.entrySet()) {
                     if (ent.getValue() == this) {
-                        requestContext.remove(ent.getKey());
+                        context.remove(ent.getKey());
                         return;
                     }
                 }
@@ -1083,6 +1100,42 @@ public class ClientImpl
         }
     }
 
+    /** 
+     * Class to handle the response contexts.   The clear is overloaded to remove
+     * this context from the threadLocal caches in the ClientImpl
+     */
+    class ResponseContext extends HashMap<String, Object> {
+        private static final long serialVersionUID = 1L;
+        
+        ResponseContext(Map<String, Object> origMap) {
+            super(origMap);
+        }
+
+        ResponseContext() {
+        }
+
+        public void reload(Map<String, Object> content) {
+            super.clear();
+            if (content != null) {
+                putAll(content);
+            }
+        }
+        
+        @Override
+        public void clear() {
+            super.clear();
+            try {
+                for (Map.Entry<Thread, ResponseContext> ent : responseContext.entrySet()) {
+                    if (ent.getValue() == this) {
+                        responseContext.remove(ent.getKey());
+                        return;
+                    }
+                }
+            } catch (Throwable t) {
+                //ignore
+            }
+        }
+    }
 
     public void setExecutor(Executor executor) {
         if (!SynchronousExecutor.isA(executor)) {

-- 
To stop receiving notification emails like this one, please contact
dkulp@apache.org.