You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2018/04/27 16:32:09 UTC
[cxf] branch master updated: [CXF-7591,
CXF-7710] More updates to the response context handling
This is an automated email from the ASF dual-hosted git repository.
dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 6abad18 [CXF-7591, CXF-7710] More updates to the response context handling
6abad18 is described below
commit 6abad1864187457984b0dff2622eafb4b5df5305
Author: Daniel Kulp <dk...@apache.org>
AuthorDate: Fri Apr 27 12:31:09 2018 -0400
[CXF-7591, CXF-7710] More updates to the response context handling
---
.../main/java/org/apache/cxf/endpoint/Client.java | 28 ++++
.../java/org/apache/cxf/endpoint/ClientImpl.java | 185 +++++++++++++--------
2 files changed, 147 insertions(+), 66 deletions(-)
diff --git a/core/src/main/java/org/apache/cxf/endpoint/Client.java b/core/src/main/java/org/apache/cxf/endpoint/Client.java
index 27ec1b3..c06b7f6 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/Client.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/Client.java
@@ -239,6 +239,34 @@ public interface Client extends InterceptorProvider, MessageObserver, ConduitSel
* @return true if the request context is a thread local
*/
boolean isThreadLocalRequestContext();
+
+
+ /**
+ * Wrappers the contexts in a way that allows the contexts
+ * to be cleared and released in an try-with-resources block
+ */
+ interface Contexts extends AutoCloseable {
+ Map<String, Object> getRequestContext();
+ Map<String, Object> getResponseContext();
+ }
+
+ default Contexts getContexts() {
+ return new Contexts() {
+ @Override
+ public void close() throws Exception {
+ getRequestContext().clear();
+ getResponseContext().clear();
+ }
+ @Override
+ public Map<String, Object> getRequestContext() {
+ return Client.this.getRequestContext();
+ }
+ @Override
+ public Map<String, Object> getResponseContext() {
+ return Client.this.getResponseContext();
+ }
+ };
+ }
Endpoint getEndpoint();
diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
index 6beecac..799c26e 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
@@ -96,11 +96,11 @@ public class ClientImpl
protected Map<String, Object> currentRequestContext = new ConcurrentHashMap<String, Object>(8, 0.75f, 4);
protected Thread latestContextThread;
- protected Map<Thread, Map<String, Object>> requestContext
- = Collections.synchronizedMap(new WeakHashMap<Thread, Map<String, Object>>());
+ protected Map<Thread, EchoContext> requestContext
+ = Collections.synchronizedMap(new WeakHashMap<Thread, EchoContext>());
- protected Map<Thread, Map<String, Object>> responseContext
- = Collections.synchronizedMap(new WeakHashMap<Thread, Map<String, Object>>());
+ protected Map<Thread, ResponseContext> responseContext
+ = Collections.synchronizedMap(new WeakHashMap<Thread, ResponseContext>());
protected Executor executor;
@@ -237,12 +237,33 @@ public class ClientImpl
return getConduitSelector().getEndpoint();
}
-
+ public void releaseThreadContexts() {
+ final Thread t = Thread.currentThread();
+ requestContext.remove(t);
+ responseContext.remove(t);
+ }
+
+ public Contexts getContexts() {
+ return new Contexts() {
+ @Override
+ public void close() throws Exception {
+ releaseThreadContexts();
+ }
+ @Override
+ public Map<String, Object> getRequestContext() {
+ return ClientImpl.this.getRequestContext();
+ }
+ @Override
+ public Map<String, Object> getResponseContext() {
+ return ClientImpl.this.getResponseContext();
+ }
+ };
+ }
public Map<String, Object> getRequestContext() {
if (isThreadLocalRequestContext()) {
final Thread t = Thread.currentThread();
if (!requestContext.containsKey(t)) {
- Map<String, Object> freshRequestContext = new EchoContext(currentRequestContext);
+ EchoContext freshRequestContext = new EchoContext(currentRequestContext, requestContext);
requestContext.put(t, freshRequestContext);
}
latestContextThread = t;
@@ -251,28 +272,30 @@ public class ClientImpl
return currentRequestContext;
}
public Map<String, Object> getResponseContext() {
- if (!responseContext.containsKey(Thread.currentThread())) {
- final Thread t = Thread.currentThread();
- responseContext.put(t, new HashMap<String, Object>() {
- private static final long serialVersionUID = 1L;
- @Override
- public void clear() {
- super.clear();
- try {
- for (Map.Entry<Thread, Map<String, Object>> ent : responseContext.entrySet()) {
- if (ent.getValue() == this) {
- responseContext.remove(ent.getKey());
- return;
- }
- }
- } catch (Throwable t) {
- //ignore
- }
- }
- });
+ final Thread t = Thread.currentThread();
+ ResponseContext ret = responseContext.get(t);
+ if (ret == null) {
+ ret = new ResponseContext();
+ responseContext.put(t, ret);
}
- return responseContext.get(Thread.currentThread());
-
+ return ret;
+ }
+ protected Map<String, Object> newResponseContext() {
+ final Thread t = Thread.currentThread();
+ ResponseContext ret = new ResponseContext();
+ responseContext.put(t, ret);
+ return ret;
+ }
+ protected Map<String, Object> reloadResponseContext(Map<String, Object> o) {
+ final Thread t = Thread.currentThread();
+ ResponseContext ctx = responseContext.get(t);
+ if (ctx == null) {
+ ctx = new ResponseContext(o);
+ responseContext.put(t, ctx);
+ } else if (o != ctx) {
+ ctx.reload(o);
+ }
+ return ctx;
}
public boolean isThreadLocalRequestContext() {
Object o = currentRequestContext.get(THREAD_LOCAL_REQUEST_CONTEXT);
@@ -335,31 +358,12 @@ public class ClientImpl
Object[] params,
Exchange exchange) throws Exception {
Map<String, Object> context = new HashMap<>();
- Map<String, Object> resp = new HashMap<>();
- Map<String, Object> req = new HashMap<>(getRequestContext());
- context.put(RESPONSE_CONTEXT, resp);
- context.put(REQUEST_CONTEXT, req);
- try {
- return invoke(oi, params, context, exchange);
- } finally {
- if (responseContext != null) {
- responseContext.put(Thread.currentThread(), resp);
- }
- }
+ return invoke(oi, params, context, exchange);
}
public Object[] invoke(BindingOperationInfo oi,
Object[] params,
Map<String, Object> context) throws Exception {
- try {
- return invoke(oi, params, context, (Exchange)null);
- } finally {
- if (context != null) {
- Map<String, Object> resp = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT));
- if (resp != null && responseContext != null) {
- responseContext.put(Thread.currentThread(), resp);
- }
- }
- }
+ return invoke(oi, params, context, (Exchange)null);
}
public void invoke(ClientCallback callback,
@@ -449,6 +453,7 @@ public class ClientImpl
Exchange exchange) throws Exception {
Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
ClassLoaderHolder origLoader = null;
+ Map<String, Object> resContext = null;
try {
ClassLoader loader = bus.getExtension(ClassLoader.class);
if (loader != null) {
@@ -467,7 +472,6 @@ public class ClientImpl
// Make sure INVOCATION CONTEXT, REQUEST_CONTEXT and RESPONSE_CONTEXT are present
// on message
Map<String, Object> reqContext = null;
- Map<String, Object> resContext = null;
if (context == null) {
context = new HashMap<>();
}
@@ -478,7 +482,7 @@ public class ClientImpl
context.put(REQUEST_CONTEXT, reqContext);
}
if (resContext == null) {
- resContext = new HashMap<>();
+ resContext = newResponseContext();
context.put(RESPONSE_CONTEXT, resContext);
}
@@ -514,7 +518,7 @@ public class ClientImpl
// handle the right response
List<Object> resList = null;
Message inMsg = message.getExchange().getInMessage();
- Map<String, Object> ctx = responseContext.get(Thread.currentThread());
+ Map<String, Object> ctx = getResponseContext();
resList = CastUtils.cast(inMsg.getContent(List.class));
Object[] result = resList == null ? null : resList.toArray();
callback.handleResponse(ctx, result);
@@ -541,6 +545,9 @@ public class ClientImpl
}
return processResult(message, exchange, oi, resContext);
} finally {
+ if (callback == null) {
+ reloadResponseContext(resContext);
+ }
if (origLoader != null) {
origLoader.reset();
}
@@ -639,7 +646,7 @@ public class ClientImpl
resContext.putAll(inMsg);
// remove the recursive reference if present
resContext.remove(Message.INVOCATION_CONTEXT);
- responseContext.put(Thread.currentThread(), resContext);
+ reloadResponseContext(resContext);
}
resList = CastUtils.cast(inMsg.getContent(List.class));
}
@@ -814,9 +821,8 @@ public class ClientImpl
Message.INVOCATION_CONTEXT));
resCtx = CastUtils.cast((Map<?, ?>) resCtx
.get(RESPONSE_CONTEXT));
- if (resCtx != null) {
- responseContext.put(Thread.currentThread(), resCtx);
- }
+ resCtx = reloadResponseContext(resCtx);
+
// remove callback so that it won't be invoked twice
callback = message.getExchange().remove(ClientCallback.class);
if (callback != null) {
@@ -843,15 +849,14 @@ public class ClientImpl
.getOutMessage()
.get(Message.INVOCATION_CONTEXT));
resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
- if (resCtx != null && responseContext != null) {
- responseContext.put(Thread.currentThread(), resCtx);
- }
try {
Object obj[] = processResult(message, message.getExchange(),
null, resCtx);
+ resCtx = reloadResponseContext(resCtx);
callback.handleResponse(resCtx, obj);
} catch (Throwable ex) {
+ resCtx = reloadResponseContext(resCtx);
callback.handleException(resCtx, ex);
}
}
@@ -1052,28 +1057,40 @@ public class ClientImpl
}
- /*
- * modification are echoed back to the shared map
- */
public class EchoContext extends ConcurrentHashMap<String, Object> {
private static final long serialVersionUID = 1L;
- public EchoContext(Map<String, Object> sharedMap) {
+
+ final Map<Thread, EchoContext> context;
+ public EchoContext(Map<String, Object> sharedMap, Map<Thread, EchoContext> ctx) {
+ super(8, 0.75f, 4);
+ if (sharedMap != null) {
+ super.putAll(sharedMap);
+ }
+ context = ctx;
+ }
+
+ public EchoContext(Map<Thread, EchoContext> ctx) {
super(8, 0.75f, 4);
- putAll(sharedMap);
+ context = ctx;
}
public void reload() {
+ reload(context.get(latestContextThread));
+ }
+ public void reload(Map<String, Object> content) {
super.clear();
- super.putAll(requestContext.get(latestContextThread));
+ if (content != null) {
+ putAll(content);
+ }
}
@Override
public void clear() {
super.clear();
try {
- for (Map.Entry<Thread, Map<String, Object>> ent : requestContext.entrySet()) {
+ for (Map.Entry<Thread, EchoContext> ent : context.entrySet()) {
if (ent.getValue() == this) {
- requestContext.remove(ent.getKey());
+ context.remove(ent.getKey());
return;
}
}
@@ -1083,6 +1100,42 @@ public class ClientImpl
}
}
+ /**
+ * Class to handle the response contexts. The clear is overloaded to remove
+ * this context from the threadLocal caches in the ClientImpl
+ */
+ class ResponseContext extends HashMap<String, Object> {
+ private static final long serialVersionUID = 1L;
+
+ ResponseContext(Map<String, Object> origMap) {
+ super(origMap);
+ }
+
+ ResponseContext() {
+ }
+
+ public void reload(Map<String, Object> content) {
+ super.clear();
+ if (content != null) {
+ putAll(content);
+ }
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ try {
+ for (Map.Entry<Thread, ResponseContext> ent : responseContext.entrySet()) {
+ if (ent.getValue() == this) {
+ responseContext.remove(ent.getKey());
+ return;
+ }
+ }
+ } catch (Throwable t) {
+ //ignore
+ }
+ }
+ }
public void setExecutor(Executor executor) {
if (!SynchronousExecutor.isA(executor)) {
--
To stop receiving notification emails like this one, please contact
dkulp@apache.org.