You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2015/11/19 03:05:01 UTC
cxf git commit: CXF-6622: Enhance Failover Feature to support Circuit
Breakers based implementation. Refactored conduit selection logic to respect
endpoint status.
Repository: cxf
Updated Branches:
refs/heads/master ce969249d -> 248c8f045
CXF-6622: Enhance Failover Feature to support Circuit Breakers based implementation. Refactored conduit selection logic to respect endpoint status.
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/248c8f04
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/248c8f04
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/248c8f04
Branch: refs/heads/master
Commit: 248c8f0458ce938a65c35006d484c6a4610cde1b
Parents: ce96924
Author: reta <dr...@gmail.com>
Authored: Wed Nov 18 21:04:35 2015 -0500
Committer: reta <dr...@gmail.com>
Committed: Wed Nov 18 21:04:35 2015 -0500
----------------------------------------------------------------------
.../CircuitBreakerTargetSelector.java | 68 ++++++++++++++++++++
.../cxf/clustering/FailoverFailedException.java | 37 +++++++++++
.../failover/CircuitBreakerFailoverTest.java | 29 +++++++++
3 files changed, 134 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
index 86aabaa..62541d9 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
@@ -33,13 +33,16 @@ import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
public static final int DEFAULT_TIMEOUT = 1000 * 60 /* 1 minute timeout as default */;
public static final int DEFAULT_THESHOLD = 1;
+ private static final String IS_SELECTED = "org.apache.cxf.clustering.CircuitBreakerTargetSelector.IS_SELECTED";
private static final Logger LOG = LogUtils.getL7dLogger(CircuitBreakerTargetSelector.class);
private final int threshold;
@@ -60,6 +63,14 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
public synchronized void setStrategy(FailoverStrategy strategy) {
super.setStrategy(strategy);
+ // Registering the original endpoint in the list of circuit breakers
+ if (getEndpoint() != null) {
+ circuits.putIfAbsent(
+ getEndpoint().getEndpointInfo().getAddress(),
+ new ZestCircuitBreaker(threshold, timeout)
+ );
+ }
+
if (strategy != null) {
for (String alternative: strategy.getAlternateAddresses(null /* no Exchange at this point */)) {
if (!StringUtils.isEmpty(alternative)) {
@@ -71,6 +82,37 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
}
}
}
+ @Override
+ public synchronized Conduit selectConduit(Message message) {
+ Conduit c = message.get(Conduit.class);
+ if (c != null) {
+ return c;
+ }
+ Exchange exchange = message.getExchange();
+ InvocationKey key = new InvocationKey(exchange);
+ InvocationContext invocation = inProgress.get(key);
+ if (invocation != null && !invocation.getContext().containsKey(IS_SELECTED)) {
+ final String address = (String) message.get(Message.ENDPOINT_ADDRESS);
+
+ if (isFailoverRequired(address)) {
+ Endpoint target = getFailoverTarget(exchange, invocation);
+
+ if (target == null) {
+ throw new Fault(new FailoverFailedException(
+ "None of alternative addresses are available at the moment"));
+ }
+
+ if (isEndpointChanged(address, target)) {
+ setEndpoint(target);
+ message.put(Message.ENDPOINT_ADDRESS, target.getEndpointInfo().getAddress());
+ overrideAddressProperty(invocation.getContext());
+ invocation.getContext().put(IS_SELECTED, null);
+ }
+ }
+ }
+
+ return getSelectedConduit(message);
+ }
@Override
protected Endpoint getFailoverTarget(final Exchange exchange, final InvocationContext invocation) {
@@ -136,4 +178,30 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
}
}
}
+
+ private boolean isEndpointChanged(final String address, final Endpoint target) {
+ if (address != null) {
+ return !address.startsWith(target.getEndpointInfo().getAddress());
+ }
+
+ if (getEndpoint().equals(target)) {
+ return false;
+ }
+
+ return !getEndpoint().getEndpointInfo().getAddress().startsWith(
+ target.getEndpointInfo().getAddress());
+ }
+
+ protected boolean isFailoverRequired(final String address) {
+ if (address != null) {
+ for (final Map.Entry<String, CircuitBreaker> entry: circuits.entrySet()) {
+ if (address.startsWith(entry.getKey())) {
+ return !entry.getValue().allowRequest();
+ }
+ }
+ }
+
+ LOG.log(Level.WARNING, "No circuit breaker present for address: " + address);
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java
new file mode 100644
index 0000000..d51408b
--- /dev/null
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.cxf.clustering;
+
+public class FailoverFailedException extends RuntimeException {
+ private static final long serialVersionUID = 6987181998625258047L;
+
+ public FailoverFailedException() {
+ super();
+ }
+
+ public FailoverFailedException(String message) {
+ super(message);
+ }
+
+ public FailoverFailedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
index ec00111..81cba78 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
@@ -22,10 +22,15 @@ package org.apache.cxf.systest.jaxrs.failover;
import java.util.ArrayList;
import java.util.List;
+import javax.ws.rs.ProcessingException;
+
+import org.apache.cxf.clustering.FailoverFailedException;
import org.apache.cxf.clustering.FailoverFeature;
import org.apache.cxf.clustering.RandomStrategy;
import org.apache.cxf.clustering.SequentialStrategy;
import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature;
+import org.apache.cxf.systest.jaxrs.BookStore;
+import org.junit.Test;
/**
* Tests failover within a static cluster.
@@ -33,6 +38,30 @@ import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature;
public class CircuitBreakerFailoverTest extends AbstractFailoverTest {
public static final String NON_PORT = allocatePort(CircuitBreakerFailoverTest.class);
+
+ @Test(expected = FailoverFailedException.class)
+ public void testSequentialStrategyUnavailableAlternatives() throws Exception {
+ FailoverFeature feature = getFeature(false,
+ "http://localhost:" + NON_PORT + "/non-existent",
+ "http://localhost:" + NON_PORT + "/non-existent2");
+
+ final BookStore bookStore = getBookStore(
+ "http://localhost:" + NON_PORT + "/non-existent", feature);
+
+ // First iteration is going to open all circuit breakers.
+ // Second iteration should not call any URL as all targets are not available.
+ for (int i = 0; i < 2; ++i) {
+ try {
+ bookStore.getBook(1);
+ fail("Exception expected");
+ } catch (ProcessingException ex) {
+ if (ex.getCause() instanceof FailoverFailedException) {
+ throw (FailoverFailedException) ex.getCause();
+ }
+ }
+ }
+ }
+
@Override
protected FailoverFeature getFeature(boolean random, String ...address) {
CircuitBreakerFailoverFeature feature = new CircuitBreakerFailoverFeature();