You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2013/12/18 17:50:35 UTC
svn commit: r1552018 - in /cxf/trunk:
core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java
rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
Author: sergeyb
Date: Wed Dec 18 16:50:35 2013
New Revision: 1552018
URL: http://svn.apache.org/r1552018
Log:
Refactoring FailoverTargetsSelector a bit to make it simpler to customize it, based on the feedback from Jacek Obarymski
Modified:
cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java
cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
Modified: cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java?rev=1552018&r1=1552017&r2=1552018&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java Wed Dec 18 16:50:35 2013
@@ -77,6 +77,14 @@ public abstract class AbstractConduitSel
}
conduits.clear();
}
+
+ protected void removeConduit(Conduit conduit) {
+ if (conduit != null) {
+ conduit.close();
+ conduits.remove(conduit);
+ }
+ }
+
/**
* Mechanics to actually get the Conduit from the ConduitInitiator
* if necessary.
Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java?rev=1552018&r1=1552017&r2=1552018&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java Wed Dec 18 16:50:35 2013
@@ -50,7 +50,7 @@ public class FailoverTargetSelector exte
private static final Logger LOG =
LogUtils.getL7dLogger(FailoverTargetSelector.class);
protected ConcurrentHashMap<InvocationKey, InvocationContext> inProgress
- = new ConcurrentHashMap<InvocationKey, InvocationContext>();;
+ = new ConcurrentHashMap<InvocationKey, InvocationContext>();
protected FailoverStrategy failoverStrategy;
private boolean supportNotAvailableErrorsOnly = true;
/**
@@ -82,7 +82,7 @@ public class FailoverTargetSelector exte
setupExchangeExceptionProperties(exchange);
InvocationKey key = new InvocationKey(exchange);
- if (!inProgress.containsKey(key)) {
+ if (getInvocationContext(key) == null) {
Endpoint endpoint = exchange.get(Endpoint.class);
BindingOperationInfo bindingOperationInfo =
exchange.getBindingOperationInfo();
@@ -116,6 +116,10 @@ public class FailoverTargetSelector exte
return getSelectedConduit(message);
}
+ protected InvocationContext getInvocationContext(InvocationKey key) {
+ return inProgress.get(key);
+ }
+
/**
* Called on completion of the MEP for which the Conduit was required.
*
@@ -123,62 +127,82 @@ public class FailoverTargetSelector exte
*/
public void complete(Exchange exchange) {
InvocationKey key = new InvocationKey(exchange);
- InvocationContext invocation = null;
- synchronized (this) {
- invocation = inProgress.get(key);
+ InvocationContext invocation = getInvocationContext(key);
+ if (invocation == null) {
+ super.complete(exchange);
+ return;
}
+
boolean failover = false;
- if (invocation != null && requiresFailover(exchange)) {
+ if (requiresFailover(exchange)) {
+ onFailure(invocation);
Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
if (failoverTarget != null) {
setEndpoint(failoverTarget);
- if (old != null) {
- old.close();
- conduits.remove(old);
- }
- Exception prevExchangeFault =
- (Exception)exchange.remove(Exception.class.getName());
- Message outMessage = exchange.getOutMessage();
- Exception prevMessageFault =
- outMessage.getContent(Exception.class);
- outMessage.setContent(Exception.class, null);
- overrideAddressProperty(invocation.getContext());
- Retryable retry = exchange.get(Retryable.class);
- exchange.clear();
- if (retry != null) {
- try {
- failover = true;
- long delay = getDelayBetweenRetries();
- if (delay > 0) {
- Thread.sleep(delay);
- }
- retry.invoke(invocation.getBindingOperationInfo(),
- invocation.getParams(),
- invocation.getContext(),
- exchange);
- } catch (Exception e) {
- if (exchange.get(Exception.class) != null) {
- exchange.put(Exception.class, prevExchangeFault);
- }
- if (outMessage.getContent(Exception.class) != null) {
- outMessage.setContent(Exception.class,
- prevMessageFault);
- }
- }
- }
+ removeConduit(old);
+ failover = performFailover(exchange, invocation);
} else {
- setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
+ setOriginalEndpoint(invocation);
}
+ } else {
+ getLogger().fine("FAILOVER_NOT_REQUIRED");
+ onSuccess(invocation);
}
+
if (!failover) {
- getLogger().fine("FAILOVER_NOT_REQUIRED");
- synchronized (this) {
- inProgress.remove(key);
+ inProgress.remove(key);
+ doComplete(exchange);
+ }
+ }
+
+ protected void doComplete(Exchange exchange) {
+ super.complete(exchange);
+ }
+
+ protected void setOriginalEndpoint(InvocationContext invocation) {
+ setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
+ }
+
+ protected boolean performFailover(Exchange exchange, InvocationContext invocation) {
+ Exception prevExchangeFault = (Exception)exchange.remove(Exception.class.getName());
+ Message outMessage = exchange.getOutMessage();
+ Exception prevMessageFault = outMessage.getContent(Exception.class);
+ outMessage.setContent(Exception.class, null);
+ overrideAddressProperty(invocation.getContext());
+
+ Retryable retry = exchange.get(Retryable.class);
+ exchange.clear();
+ boolean failover = false;
+ if (retry != null) {
+ try {
+ failover = true;
+ long delay = getDelayBetweenRetries();
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
+ retry.invoke(invocation.getBindingOperationInfo(),
+ invocation.getParams(),
+ invocation.getContext(),
+ exchange);
+ } catch (Exception e) {
+ if (exchange.get(Exception.class) != null) {
+ exchange.put(Exception.class, prevExchangeFault);
+ }
+ if (outMessage.getContent(Exception.class) != null) {
+ outMessage.setContent(Exception.class,
+ prevMessageFault);
+ }
}
- super.complete(exchange);
}
+ return failover;
+ }
+
+ protected void onSuccess(InvocationContext context) {
+ }
+
+ protected void onFailure(InvocationContext context) {
}
/**