You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2013/12/18 17:50:35 UTC

svn commit: r1552018 - in /cxf/trunk: core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java

Author: sergeyb
Date: Wed Dec 18 16:50:35 2013
New Revision: 1552018

URL: http://svn.apache.org/r1552018
Log:
Refactoring FailoverTargetsSelector a bit to make it simpler to customize it, based on the feedback from Jacek Obarymski

Modified:
    cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java
    cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java?rev=1552018&r1=1552017&r2=1552018&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/endpoint/AbstractConduitSelector.java Wed Dec 18 16:50:35 2013
@@ -77,6 +77,14 @@ public abstract class AbstractConduitSel
         }
         conduits.clear();
     }
+    
+    protected void removeConduit(Conduit conduit) {
+        if (conduit != null) {
+            conduit.close();
+            conduits.remove(conduit);
+        }
+    }
+    
     /**
      * Mechanics to actually get the Conduit from the ConduitInitiator
      * if necessary.

Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java?rev=1552018&r1=1552017&r2=1552018&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java Wed Dec 18 16:50:35 2013
@@ -50,7 +50,7 @@ public class FailoverTargetSelector exte
     private static final Logger LOG =
         LogUtils.getL7dLogger(FailoverTargetSelector.class);
     protected ConcurrentHashMap<InvocationKey, InvocationContext> inProgress 
-        = new ConcurrentHashMap<InvocationKey, InvocationContext>();;
+        = new ConcurrentHashMap<InvocationKey, InvocationContext>();
     protected FailoverStrategy failoverStrategy;
     private boolean supportNotAvailableErrorsOnly = true;
     /**
@@ -82,7 +82,7 @@ public class FailoverTargetSelector exte
         setupExchangeExceptionProperties(exchange);
         
         InvocationKey key = new InvocationKey(exchange);
-        if (!inProgress.containsKey(key)) {
+        if (getInvocationContext(key) == null) {
             Endpoint endpoint = exchange.get(Endpoint.class);
             BindingOperationInfo bindingOperationInfo =
                 exchange.getBindingOperationInfo();
@@ -116,6 +116,10 @@ public class FailoverTargetSelector exte
         return getSelectedConduit(message);
     }
 
+    protected InvocationContext getInvocationContext(InvocationKey key) { 
+        return inProgress.get(key);
+    }
+    
     /**
      * Called on completion of the MEP for which the Conduit was required.
      * 
@@ -123,62 +127,82 @@ public class FailoverTargetSelector exte
      */
     public void complete(Exchange exchange) {
         InvocationKey key = new InvocationKey(exchange);
-        InvocationContext invocation = null;
-        synchronized (this) {
-            invocation = inProgress.get(key);
+        InvocationContext invocation = getInvocationContext(key);
+        if (invocation == null) {
+            super.complete(exchange);
+            return;
         }
+        
         boolean failover = false;
-        if (invocation != null && requiresFailover(exchange)) {
+        if (requiresFailover(exchange)) {
+            onFailure(invocation);
             Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
             
             Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
             if (failoverTarget != null) {
                 setEndpoint(failoverTarget);
-                if (old != null) {
-                    old.close();
-                    conduits.remove(old);
-                }
-                Exception prevExchangeFault =
-                    (Exception)exchange.remove(Exception.class.getName());
-                Message outMessage = exchange.getOutMessage();
-                Exception prevMessageFault =
-                    outMessage.getContent(Exception.class);
-                outMessage.setContent(Exception.class, null);
-                overrideAddressProperty(invocation.getContext());
-                Retryable retry = exchange.get(Retryable.class);
-                exchange.clear();
-                if (retry != null) {
-                    try {
-                        failover = true;
-                        long delay = getDelayBetweenRetries();
-                        if (delay > 0) {
-                            Thread.sleep(delay);
-                        }
-                        retry.invoke(invocation.getBindingOperationInfo(),
-                                     invocation.getParams(),
-                                     invocation.getContext(),
-                                     exchange);
-                    } catch (Exception e) {
-                        if (exchange.get(Exception.class) != null) {
-                            exchange.put(Exception.class, prevExchangeFault);
-                        }
-                        if (outMessage.getContent(Exception.class) != null) {
-                            outMessage.setContent(Exception.class,
-                                                  prevMessageFault);
-                        }
-                    }
-                }
+                removeConduit(old);
+                failover = performFailover(exchange, invocation);
             } else {
-                setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
+                setOriginalEndpoint(invocation);
             }
+        } else {
+            getLogger().fine("FAILOVER_NOT_REQUIRED");
+            onSuccess(invocation);
         }
+        
         if (!failover) {
-            getLogger().fine("FAILOVER_NOT_REQUIRED");
-            synchronized (this) {
-                inProgress.remove(key);
+            inProgress.remove(key);
+            doComplete(exchange);
+        }
+    }
+    
+    protected void doComplete(Exchange exchange) {
+        super.complete(exchange);
+    }
+    
+    protected void setOriginalEndpoint(InvocationContext invocation) {
+        setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
+    }
+    
+    protected boolean performFailover(Exchange exchange, InvocationContext invocation) {
+        Exception prevExchangeFault = (Exception)exchange.remove(Exception.class.getName());
+        Message outMessage = exchange.getOutMessage();
+        Exception prevMessageFault = outMessage.getContent(Exception.class);
+        outMessage.setContent(Exception.class, null);
+        overrideAddressProperty(invocation.getContext());
+        
+        Retryable retry = exchange.get(Retryable.class);
+        exchange.clear();
+        boolean failover = false;
+        if (retry != null) {
+            try {
+                failover = true;
+                long delay = getDelayBetweenRetries();
+                if (delay > 0) {
+                    Thread.sleep(delay);
+                }
+                retry.invoke(invocation.getBindingOperationInfo(),
+                             invocation.getParams(),
+                             invocation.getContext(),
+                             exchange);
+            } catch (Exception e) {
+                if (exchange.get(Exception.class) != null) {
+                    exchange.put(Exception.class, prevExchangeFault);
+                }
+                if (outMessage.getContent(Exception.class) != null) {
+                    outMessage.setContent(Exception.class,
+                                          prevMessageFault);
+                }
             }
-            super.complete(exchange);
         }
+        return failover;
+    }
+    
+    protected void onSuccess(InvocationContext context) {
+    }
+    
+    protected void onFailure(InvocationContext context) {
     }
     
     /**