You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2015/11/19 03:05:01 UTC

cxf git commit: CXF-6622: Enhance Failover Feature to support Circuit Breakers based implementation. Refactored conduit selection logic to respect endpoint status.

Repository: cxf
Updated Branches:
  refs/heads/master ce969249d -> 248c8f045


CXF-6622: Enhance Failover Feature to support Circuit Breakers based implementation. Refactored conduit selection logic to respect endpoint status.


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/248c8f04
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/248c8f04
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/248c8f04

Branch: refs/heads/master
Commit: 248c8f0458ce938a65c35006d484c6a4610cde1b
Parents: ce96924
Author: reta <dr...@gmail.com>
Authored: Wed Nov 18 21:04:35 2015 -0500
Committer: reta <dr...@gmail.com>
Committed: Wed Nov 18 21:04:35 2015 -0500

----------------------------------------------------------------------
 .../CircuitBreakerTargetSelector.java           | 68 ++++++++++++++++++++
 .../cxf/clustering/FailoverFailedException.java | 37 +++++++++++
 .../failover/CircuitBreakerFailoverTest.java    | 29 +++++++++
 3 files changed, 134 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
index 86aabaa..62541d9 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
@@ -33,13 +33,16 @@ import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
 
 public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
     public static final int DEFAULT_TIMEOUT = 1000 * 60 /* 1 minute timeout as default */;
     public static final int DEFAULT_THESHOLD = 1;
 
+    private static final String IS_SELECTED = "org.apache.cxf.clustering.CircuitBreakerTargetSelector.IS_SELECTED";
     private static final Logger LOG = LogUtils.getL7dLogger(CircuitBreakerTargetSelector.class);
     
     private final int threshold;
@@ -60,6 +63,14 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
     public synchronized void setStrategy(FailoverStrategy strategy) {
         super.setStrategy(strategy);
         
+        // Registering the original endpoint in the list of circuit breakers
+        if (getEndpoint() != null) {
+            circuits.putIfAbsent(
+                getEndpoint().getEndpointInfo().getAddress(), 
+                new ZestCircuitBreaker(threshold, timeout)
+            );
+        }
+        
         if (strategy != null) {
             for (String alternative: strategy.getAlternateAddresses(null /* no Exchange at this point */)) {
                 if (!StringUtils.isEmpty(alternative)) {
@@ -71,6 +82,37 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
             }
         }
     }
+    @Override
+    public synchronized Conduit selectConduit(Message message) {
+        Conduit c = message.get(Conduit.class);
+        if (c != null) {
+            return c;
+        }
+        Exchange exchange = message.getExchange();
+        InvocationKey key = new InvocationKey(exchange);
+        InvocationContext invocation = inProgress.get(key);
+        if (invocation != null && !invocation.getContext().containsKey(IS_SELECTED)) {
+            final String address = (String) message.get(Message.ENDPOINT_ADDRESS);
+            
+            if (isFailoverRequired(address)) {
+                Endpoint target = getFailoverTarget(exchange, invocation);
+                
+                if (target == null) {
+                    throw new Fault(new FailoverFailedException(
+                        "None of alternative addresses are available at the moment"));
+                }
+
+                if (isEndpointChanged(address, target)) {
+                    setEndpoint(target);
+                    message.put(Message.ENDPOINT_ADDRESS, target.getEndpointInfo().getAddress());
+                    overrideAddressProperty(invocation.getContext());
+                    invocation.getContext().put(IS_SELECTED, null);
+                }
+            }
+        }
+
+        return getSelectedConduit(message);
+    }
     
     @Override
     protected Endpoint getFailoverTarget(final Exchange exchange, final InvocationContext invocation) {
@@ -136,4 +178,30 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
             }
         }
     }
+    
+    private boolean isEndpointChanged(final String address, final Endpoint target) {
+        if (address != null) {
+            return !address.startsWith(target.getEndpointInfo().getAddress());
+        } 
+        
+        if (getEndpoint().equals(target)) {
+            return false;
+        }
+        
+        return !getEndpoint().getEndpointInfo().getAddress().startsWith(
+            target.getEndpointInfo().getAddress());
+    }
+    
+    protected boolean isFailoverRequired(final String address) {
+        if (address != null) {
+            for (final Map.Entry<String, CircuitBreaker> entry: circuits.entrySet()) {
+                if (address.startsWith(entry.getKey())) {
+                    return !entry.getValue().allowRequest();
+                }
+            }
+        }
+        
+        LOG.log(Level.WARNING, "No circuit breaker present for address: " + address);
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java
new file mode 100644
index 0000000..d51408b
--- /dev/null
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.cxf.clustering;
+
+public class FailoverFailedException extends RuntimeException {
+    private static final long serialVersionUID = 6987181998625258047L;
+    
+    public FailoverFailedException() {
+        super();
+    }
+    
+    public FailoverFailedException(String message) {
+        super(message);
+    }
+    
+    public FailoverFailedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
index ec00111..81cba78 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
@@ -22,10 +22,15 @@ package org.apache.cxf.systest.jaxrs.failover;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.ws.rs.ProcessingException;
+
+import org.apache.cxf.clustering.FailoverFailedException;
 import org.apache.cxf.clustering.FailoverFeature;
 import org.apache.cxf.clustering.RandomStrategy;
 import org.apache.cxf.clustering.SequentialStrategy;
 import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature;
+import org.apache.cxf.systest.jaxrs.BookStore;
+import org.junit.Test;
 
 /**
  * Tests failover within a static cluster.
@@ -33,6 +38,30 @@ import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature;
 public class CircuitBreakerFailoverTest extends AbstractFailoverTest {
     public static final String NON_PORT = allocatePort(CircuitBreakerFailoverTest.class);
 
+    
+    @Test(expected = FailoverFailedException.class)
+    public void testSequentialStrategyUnavailableAlternatives() throws Exception {
+        FailoverFeature feature = getFeature(false, 
+            "http://localhost:" + NON_PORT + "/non-existent", 
+            "http://localhost:" + NON_PORT + "/non-existent2"); 
+        
+        final BookStore bookStore = getBookStore(
+            "http://localhost:" + NON_PORT + "/non-existent", feature);
+        
+        // First iteration is going to open all circuit breakers.
+        // Second iteration should not call any URL as all targets are not available. 
+        for (int i = 0; i < 2; ++i) {
+            try {
+                bookStore.getBook(1);
+                fail("Exception expected");
+            } catch (ProcessingException ex) {
+                if (ex.getCause() instanceof FailoverFailedException) {
+                    throw (FailoverFailedException) ex.getCause();
+                }
+            }
+        }
+    }
+    
     @Override
     protected FailoverFeature getFeature(boolean random, String ...address) {
         CircuitBreakerFailoverFeature feature = new CircuitBreakerFailoverFeature();