You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2015/11/06 03:29:00 UTC
cxf git commit: CXF-6622: Enhance Failover Feature to support Circuit
Breakers based implementation (using Apache Zest)
Repository: cxf
Updated Branches:
refs/heads/master 295091064 -> a261507eb
CXF-6622: Enhance Failover Feature to support Circuit Breakers based implementation (using Apache Zest)
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a261507e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a261507e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a261507e
Branch: refs/heads/master
Commit: a261507ebd3104b1a00298801ec9815ed1e7a728
Parents: 2950910
Author: reta <dr...@gmail.com>
Authored: Thu Nov 5 21:28:43 2015 -0500
Committer: reta <dr...@gmail.com>
Committed: Thu Nov 5 21:28:43 2015 -0500
----------------------------------------------------------------------
parent/pom.xml | 6 +
rt/features/clustering/pom.xml | 5 +
.../CircuitBreakerTargetSelector.java | 140 +++++++
.../cxf/clustering/FailoverTargetSelector.java | 21 +-
.../LoadDistributorTargetSelector.java | 4 +-
.../circuitbreaker/CircuitBreaker.java | 43 ++
.../CircuitBreakerFailoverFeature.java | 58 +++
.../circuitbreaker/ZestCircuitBreaker.java | 47 +++
systests/jaxrs/pom.xml | 5 +
.../jaxrs/failover/AbstractFailoverTest.java | 395 +++++++++++++++++++
.../failover/CircuitBreakerFailoverTest.java | 55 +++
.../systest/jaxrs/failover/FailoverTest.java | 369 +----------------
12 files changed, 777 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 9971539..84df93e 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -195,6 +195,7 @@
<cxf.tika.version>1.6</cxf.tika.version>
<cxf.jexl.version>2.1.1</cxf.jexl.version>
<cxf.htrace.version>4.0.1-incubating</cxf.htrace.version>
+ <cxf.zest.version>2.1</cxf.zest.version>
<cxf.checkstyle.extension />
<cxf.jaxb.context.class />
<cxf.jaxb.context.class.property>none</cxf.jaxb.context.class.property>
@@ -1791,6 +1792,11 @@
<artifactId>htrace-core4</artifactId>
<version>${cxf.htrace.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.qi4j.library</groupId>
+ <artifactId>org.qi4j.library.circuitbreaker</artifactId>
+ <version>${cxf.zest.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<profiles>
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/pom.xml
----------------------------------------------------------------------
diff --git a/rt/features/clustering/pom.xml b/rt/features/clustering/pom.xml
index a5e944c..6faee2c 100644
--- a/rt/features/clustering/pom.xml
+++ b/rt/features/clustering/pom.xml
@@ -85,5 +85,10 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.qi4j.library</groupId>
+ <artifactId>org.qi4j.library.circuitbreaker</artifactId>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
new file mode 100644
index 0000000..08c9fe6
--- /dev/null
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.clustering.circuitbreaker.CircuitBreaker;
+import org.apache.cxf.clustering.circuitbreaker.ZestCircuitBreaker;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+
+public class CircuitBreakerTargetSelector extends FailoverTargetSelector {
+ public static final int DEFAULT_TIMEOUT = 1000 * 60 /* 1 minute timeout as default */;
+ public static final int DEFAULT_THESHOLD = 1;
+
+ private static final Logger LOG = LogUtils.getL7dLogger(CircuitBreakerTargetSelector.class);
+
+ private final int threshold;
+ private final long timeout;
+ private final ConcurrentMap<String, CircuitBreaker> circuits = new ConcurrentHashMap<>();
+
+ public CircuitBreakerTargetSelector(final int threshold, final long timeout) {
+ super();
+ this.threshold = threshold;
+ this.timeout = timeout;
+ }
+
+ public CircuitBreakerTargetSelector() {
+ this(DEFAULT_THESHOLD, DEFAULT_TIMEOUT);
+ }
+
+ @Override
+ public synchronized void setStrategy(FailoverStrategy strategy) {
+ super.setStrategy(strategy);
+
+ if (strategy != null) {
+ for (String alternative: strategy.getAlternateAddresses(null /* no Exchange at this point */)) {
+ if (!StringUtils.isEmpty(alternative)) {
+ circuits.putIfAbsent(
+ alternative,
+ new ZestCircuitBreaker(threshold, timeout)
+ );
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Endpoint getFailoverTarget(final Exchange exchange, final InvocationContext invocation) {
+ if (circuits.isEmpty()) {
+ LOG.log(Level.SEVERE, "No alternative addresses configured");
+ return null;
+ }
+
+ final List<String> alternateAddresses = new ArrayList<>();
+ for (final Map.Entry<String, CircuitBreaker> entry: circuits.entrySet()) {
+ if (entry.getValue().allowRequest()) {
+ alternateAddresses.add(entry.getKey());
+ }
+ }
+
+ Endpoint failoverTarget = null;
+ if (!alternateAddresses.isEmpty()) {
+ final String alternateAddress = getStrategy().selectAlternateAddress(alternateAddresses);
+
+ // Reuse current endpoint
+ if (alternateAddress != null) {
+ failoverTarget = getEndpoint();
+ failoverTarget.getEndpointInfo().setAddress(alternateAddress);
+ }
+ }
+
+ return failoverTarget;
+ }
+
+ @Override
+ public void prepare(Message message) {
+ super.prepare(message);
+ }
+
+ @Override
+ protected void onFailure(InvocationContext context, Exception ex) {
+ super.onFailure(context, ex);
+
+ final Map<String, Object> requestContext =
+ CastUtils.cast((Map<?, ?>)context.getContext().get(Client.REQUEST_CONTEXT));
+
+ if (requestContext != null) {
+ final String address = (String)requestContext.get(Message.ENDPOINT_ADDRESS);
+ final CircuitBreaker circuitBreaker = circuits.get(address);
+ if (circuitBreaker != null) {
+ circuitBreaker.markFailure(ex);
+ }
+ }
+ }
+
+ @Override
+ protected void onSuccess(InvocationContext context) {
+ super.onSuccess(context);
+
+ final Map<String, Object> requestContext =
+ CastUtils.cast((Map<?, ?>)context.getContext().get(Client.REQUEST_CONTEXT));
+
+ if (requestContext != null) {
+ final String address = (String)requestContext.get(Message.ENDPOINT_ADDRESS);
+ final CircuitBreaker circuitBreaker = circuits.get(address);
+ if (circuitBreaker != null) {
+ circuitBreaker.markSuccess();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
index d364a55..21f129e 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
@@ -139,8 +139,9 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
}
boolean failover = false;
- if (requiresFailover(exchange)) {
- onFailure(invocation);
+ final Exception ex = getExceptionIfPresent(exchange);
+ if (requiresFailover(exchange, ex)) {
+ onFailure(invocation, ex);
Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
@@ -208,7 +209,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
protected void onSuccess(InvocationContext context) {
}
- protected void onFailure(InvocationContext context) {
+ protected void onFailure(InvocationContext context, Exception ex) {
}
/**
@@ -260,11 +261,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
* @param exchange the current Exchange
* @return boolean true if a failover should be attempted
*/
- protected boolean requiresFailover(Exchange exchange) {
- Message outMessage = exchange.getOutMessage();
- Exception ex = outMessage.get(Exception.class) != null
- ? outMessage.get(Exception.class)
- : exchange.get(Exception.class);
+ protected boolean requiresFailover(Exchange exchange, Exception ex) {
getLogger().log(Level.FINE,
"CHECK_LAST_INVOKE_FAILED",
new Object[] {ex != null});
@@ -286,6 +283,14 @@ public class FailoverTargetSelector extends AbstractConduitSelector {
return failover;
}
+
+ private Exception getExceptionIfPresent(Exchange exchange) {
+ Message outMessage = exchange.getOutMessage();
+ Exception ex = outMessage.get(Exception.class) != null
+ ? outMessage.get(Exception.class)
+ : exchange.get(Exception.class);
+ return ex;
+ }
/**
* Get the failover target endpoint, if a suitable one is available.
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
index c4cd273..97bfd27 100644
--- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java
@@ -212,8 +212,8 @@ public class LoadDistributorTargetSelector extends FailoverTargetSelector {
}
@Override
- protected boolean requiresFailover(Exchange exchange) {
- return failover && super.requiresFailover(exchange);
+ protected boolean requiresFailover(Exchange exchange, Exception ex) {
+ return failover && super.requiresFailover(exchange, ex);
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java
new file mode 100644
index 0000000..4a4e3e0
--- /dev/null
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering.circuitbreaker;
+
+/**
+ * Basic abstract interface for circuit breaker implementation.
+ */
+public interface CircuitBreaker {
+ /**
+ * Is request is allowed to go through (is circuit breaker closed or opened).
+ * @return "false" if circuit breaker is open, "true" otherwise
+ */
+ boolean allowRequest();
+
+ /**
+ * Reports about failure conditions to circuit breaker.
+ * @param cause exception happened (could be null in case the error is deducted
+ * from response status/code).
+ */
+ void markFailure(Throwable cause);
+
+ /**
+ * Reports about successful invocation to circuit breaker.
+ */
+ void markSuccess();
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java
new file mode 100644
index 0000000..5ba5eb3
--- /dev/null
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering.circuitbreaker;
+
+import org.apache.cxf.clustering.CircuitBreakerTargetSelector;
+import org.apache.cxf.clustering.FailoverFeature;
+import org.apache.cxf.clustering.FailoverTargetSelector;
+
+import static org.apache.cxf.clustering.CircuitBreakerTargetSelector.DEFAULT_THESHOLD;
+import static org.apache.cxf.clustering.CircuitBreakerTargetSelector.DEFAULT_TIMEOUT;
+
+public class CircuitBreakerFailoverFeature extends FailoverFeature {
+ private int threshold;
+ private long timeout;
+ private FailoverTargetSelector targetSelector;
+
+ public CircuitBreakerFailoverFeature() {
+ this(DEFAULT_THESHOLD, DEFAULT_TIMEOUT);
+ }
+
+ public CircuitBreakerFailoverFeature(int threshold, long timeout) {
+ this.threshold = threshold;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public FailoverTargetSelector getTargetSelector() {
+ if (this.targetSelector == null) {
+ this.targetSelector = new CircuitBreakerTargetSelector(threshold, timeout);
+ }
+ return this.targetSelector;
+ }
+
+ public int getThreshold() {
+ return threshold;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java
----------------------------------------------------------------------
diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java
new file mode 100644
index 0000000..a64936d
--- /dev/null
+++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering.circuitbreaker;
+
+import org.qi4j.library.circuitbreaker.CircuitBreaker;
+
+public class ZestCircuitBreaker extends CircuitBreaker
+ implements org.apache.cxf.clustering.circuitbreaker.CircuitBreaker {
+
+ private final CircuitBreaker delegate;
+
+ public ZestCircuitBreaker(final int threshold, final long timeout) {
+ delegate = new CircuitBreaker(threshold, timeout);
+ }
+
+ @Override
+ public boolean allowRequest() {
+ return delegate.isOn();
+ }
+
+ @Override
+ public void markFailure(Throwable cause) {
+ delegate.throwable(cause);
+ }
+
+ @Override
+ public void markSuccess() {
+ delegate.success();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/pom.xml
----------------------------------------------------------------------
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index b1f3573..33cd1ac 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -522,6 +522,11 @@
<artifactId>atmosphere-runtime</artifactId>
<version>${cxf.atmosphere.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.qi4j.library</groupId>
+ <artifactId>org.qi4j.library.circuitbreaker</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java
new file mode 100644
index 0000000..a3636cd
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.clustering.FailoverFeature;
+import org.apache.cxf.clustering.FailoverTargetSelector;
+import org.apache.cxf.clustering.RandomStrategy;
+import org.apache.cxf.clustering.RetryStrategy;
+import org.apache.cxf.clustering.SequentialStrategy;
+import org.apache.cxf.endpoint.ConduitSelector;
+import org.apache.cxf.feature.Feature;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.systest.jaxrs.Book;
+import org.apache.cxf.systest.jaxrs.BookStore;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Tests failover within a static cluster.
+ */
+public abstract class AbstractFailoverTest extends AbstractBusClientServerTestBase {
+ public static final String NON_PORT = allocatePort(AbstractFailoverTest.class);
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ assertTrue("server did not launch correctly",
+ launchServer(Server.class, true));
+ boolean activeReplica1Started = false;
+ boolean activeReplica2Started = false;
+ for (int i = 0; i < 60; i++) {
+ if (!activeReplica1Started) {
+ activeReplica1Started = checkReplica(Server.ADDRESS2);
+ }
+ if (!activeReplica2Started) {
+ activeReplica2Started = checkReplica(Server.ADDRESS3);
+ }
+ if (activeReplica1Started && activeReplica2Started) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+ private static boolean checkReplica(String address) {
+ try {
+ Response r = WebClient.create(address).query("_wadl").get();
+ return r.getStatus() == 200;
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+
+ @Test
+ public void testSequentialStrategy() throws Exception {
+ FailoverFeature feature =
+ getFeature(false, Server.ADDRESS2, Server.ADDRESS3);
+ strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false, false);
+ }
+
+ @Test
+ public void testSequentialStrategyWebClient() throws Exception {
+ FailoverFeature feature =
+ getFeature(false, Server.ADDRESS2, Server.ADDRESS3);
+ strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false);
+ }
+
+ @Test
+ public void testSequentialStrategyWith404() throws Exception {
+ FailoverFeature feature = getFeature(false, Server.ADDRESS3);
+ feature.getTargetSelector().setSupportNotAvailableErrorsOnly(true);
+ strategyTestWebClient(Server.ADDRESS2 + "/new", feature, Server.ADDRESS3, null, false, false);
+ }
+
+ @Test
+ public void testSequentialStrategyWith406() throws Exception {
+ FailoverFeature feature = getFeature(false, Server.ADDRESS3);
+ feature.getTargetSelector().setSupportNotAvailableErrorsOnly(false);
+ strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, false);
+ }
+
+ @Test
+ public void testSequentialStrategyWith406NoFailover() throws Exception {
+ FailoverFeature feature = getFeature(false, Server.ADDRESS3);
+ strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, true);
+ }
+
+ @Test
+ public void testRandomStrategyWebClient() throws Exception {
+ FailoverFeature feature =
+ getFeature(true, Server.ADDRESS3, Server.ADDRESS2);
+ strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, true);
+ }
+
+ @Test
+ public void testRandomStrategy() throws Exception {
+ FailoverFeature feature =
+ getFeature(true, Server.ADDRESS2, Server.ADDRESS3);
+ strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, true);
+ }
+
+ @Test
+ public void testRandomStrategy2() throws Exception {
+ FailoverFeature feature =
+ getFeature(true, Server.ADDRESS2, Server.ADDRESS3);
+ strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, false);
+ }
+
+ @Test
+ public void testSequentialStrategyWithDiffBaseAddresses() throws Exception {
+ FailoverFeature feature =
+ getFeature(false, Server.ADDRESS3, null);
+ strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, false);
+ }
+
+ public void testSequentialStrategyWithDiffBaseAddresses2() throws Exception {
+ FailoverFeature feature =
+ getFeature(false, Server.ADDRESS3, null);
+ strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, true);
+ }
+
+ @Test(expected = InternalServerErrorException.class)
+ public void testSequentialStrategyWithServerException() throws Exception {
+ FailoverFeature feature =
+ getFeature(false, Server.ADDRESS2, Server.ADDRESS3);
+ strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, true, false, false);
+ }
+
+ @Test(expected = ProcessingException.class)
+ public void testSequentialStrategyFailure() throws Exception {
+ FailoverFeature feature =
+ getFeature(false, "http://localhost:" + NON_PORT + "/non-existent");
+ strategyTest(Server.ADDRESS1, feature, null, null, false, false, false);
+ }
+
+ @Test
+ public void testSequentialStrategyWithRetries() throws Exception {
+ String address = "http://localhost:" + NON_PORT + "/non-existent";
+ String address2 = "http://localhost:" + NON_PORT + "/non-existent2";
+
+ FailoverFeature feature = new FailoverFeature();
+ List<String> alternateAddresses = new ArrayList<String>();
+ alternateAddresses.add(address);
+ alternateAddresses.add(address2);
+ CustomRetryStrategy strategy = new CustomRetryStrategy();
+ strategy.setMaxNumberOfRetries(5);
+ strategy.setAlternateAddresses(alternateAddresses);
+ feature.setStrategy(strategy);
+
+ BookStore store = getBookStore(address, feature);
+ try {
+ store.getBook("1");
+ fail("Exception expected");
+ } catch (ProcessingException ex) {
+ assertEquals(10, strategy.getTotalCount());
+ assertEquals(5, strategy.getAddressCount(address));
+ assertEquals(5, strategy.getAddressCount(address2));
+ }
+ }
+
+ protected abstract FailoverFeature getFeature(boolean random, String ...address);
+
+
+
+ protected BookStore getBookStore(String address,
+ FailoverFeature feature) throws Exception {
+ JAXRSClientFactoryBean bean = createBean(address, feature);
+ bean.setServiceClass(BookStore.class);
+ return bean.create(BookStore.class);
+ }
+
+ protected WebClient getWebClient(String address,
+ FailoverFeature feature) throws Exception {
+ JAXRSClientFactoryBean bean = createBean(address, feature);
+
+ return bean.createWebClient();
+ }
+
+ protected JAXRSClientFactoryBean createBean(String address,
+ FailoverFeature feature) {
+ JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
+ bean.setAddress(address);
+ List<Feature> features = new ArrayList<Feature>();
+ features.add(feature);
+ bean.setFeatures(features);
+
+ return bean;
+ }
+
+ protected void strategyTest(String inactiveReplica,
+ FailoverFeature feature,
+ String activeReplica1,
+ String activeReplica2,
+ boolean expectServerException,
+ boolean expectRandom,
+ boolean singleProxy) throws Exception {
+ boolean randomized = false;
+ String prevEndpoint = null;
+ BookStore bookStore = null;
+
+ if (singleProxy) {
+ bookStore = getBookStore(inactiveReplica, feature);
+ }
+
+ for (int i = 0; i < 20; i++) {
+ if (!singleProxy) {
+ feature.getTargetSelector().close();
+ bookStore = getBookStore(inactiveReplica, feature);
+ }
+ verifyStrategy(bookStore, expectRandom
+ ? RandomStrategy.class
+ : SequentialStrategy.class);
+ Exception ex = null;
+ try {
+ if (expectServerException) {
+ bookStore.getBook("9999");
+ fail("Exception expected");
+ } else {
+ Book book = bookStore.echoBookElementJson(new Book("CXF", 123));
+ assertNotNull("expected non-null response", book);
+ assertEquals("unexpected id", 123L, book.getId());
+ }
+ } catch (Exception error) {
+ if (!expectServerException) {
+ //String currEndpoint = getCurrentEndpointAddress(bookStore);
+ //assertTrue(currEndpoint.equals(inactiveReplica));
+ throw error;
+ }
+ ex = error;
+ }
+ String currEndpoint = getCurrentEndpointAddress(bookStore);
+ assertFalse(currEndpoint.equals(inactiveReplica));
+ if (expectRandom) {
+ assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2));
+ } else {
+ assertEquals(activeReplica1, currEndpoint);
+ }
+ if (expectServerException) {
+ assertNotNull(ex);
+ throw ex;
+ }
+
+ if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) {
+ randomized = true;
+ }
+ prevEndpoint = currEndpoint;
+ }
+ if (!singleProxy) {
+ assertEquals("unexpected random/sequential distribution of failovers",
+ expectRandom,
+ randomized);
+ }
+ }
+
+ protected void strategyTestWebClient(String inactiveReplica,
+ FailoverFeature feature,
+ String activeReplica1,
+ String activeReplica2,
+ boolean expectServerException,
+ boolean expectRandom) throws Exception {
+ boolean randomized = false;
+ String prevEndpoint = null;
+ for (int i = 0; i < 20; i++) {
+ feature.getTargetSelector().close();
+ WebClient bookStore = getWebClient(inactiveReplica, feature);
+ verifyStrategy(bookStore, expectRandom
+ ? RandomStrategy.class
+ : SequentialStrategy.class);
+ String bookId = expectServerException ? "9999" : "123";
+ bookStore.path("bookstore/books").path(bookId);
+ Exception ex = null;
+ try {
+ Book book = bookStore.get(Book.class);
+ assertNotNull("expected non-null response", book);
+ assertEquals("unexpected id", 123L, book.getId());
+ } catch (Exception error) {
+ if (!expectServerException) {
+ throw error;
+ }
+ ex = error;
+ }
+ String currEndpoint = getCurrentEndpointAddress(bookStore);
+ assertFalse(currEndpoint.equals(inactiveReplica));
+ if (expectRandom) {
+ assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2));
+ } else {
+ assertTrue(currEndpoint.equals(activeReplica1));
+ }
+ if (expectServerException) {
+ assertNotNull(ex);
+ throw ex;
+ }
+
+ if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) {
+ randomized = true;
+ }
+ prevEndpoint = currEndpoint;
+ }
+ assertEquals("unexpected random/sequential distribution of failovers",
+ expectRandom,
+ randomized);
+ }
+
+ protected void strategyTestWebClientHttpError(String currentReplica,
+ FailoverFeature feature,
+ String newReplica,
+ boolean notAvailableOnly) throws Exception {
+ WebClient bookStore = getWebClient(currentReplica, feature);
+ verifyStrategy(bookStore, SequentialStrategy.class);
+ bookStore.path("bookstore/webappexceptionXML");
+ Response r = bookStore.get();
+ assertEquals(406, r.getStatus());
+ String currEndpoint = getCurrentEndpointAddress(bookStore);
+ if (notAvailableOnly) {
+ assertTrue(currEndpoint.equals(currentReplica));
+ } else {
+ assertTrue(currEndpoint.equals(newReplica));
+ }
+ }
+
+
+ protected String getCurrentEndpointAddress(Object client) {
+ String currentBaseURI = WebClient.client(client).getBaseURI().toString();
+ String currentURI = WebClient.client(client).getCurrentURI().toString();
+ assertTrue(currentURI.startsWith(currentBaseURI));
+ return currentBaseURI;
+ }
+
+
+ protected void verifyStrategy(Object proxy, Class<?> clz) {
+ ConduitSelector conduitSelector =
+ WebClient.getConfig(proxy).getConduitSelector();
+ if (conduitSelector instanceof FailoverTargetSelector) {
+ Object strategy =
+ ((FailoverTargetSelector)conduitSelector).getStrategy();
+ assertTrue("unexpected strategy", clz.isInstance(strategy));
+ } else {
+ fail("unexpected conduit selector: " + conduitSelector);
+ }
+ }
+
+ private static class CustomRetryStrategy extends RetryStrategy {
+ private int totalCount;
+ private Map<String, Integer> map = new HashMap<String, Integer>();
+ @Override
+ protected <T> T getNextAlternate(List<T> alternates) {
+ totalCount++;
+ T next = super.getNextAlternate(alternates);
+ String address = (String)next;
+ Integer count = map.get(address);
+ if (count == null) {
+ count = 0;
+ }
+ count++;
+ map.put(address, count);
+ return next;
+ }
+
+ public int getTotalCount() {
+ return totalCount - 2;
+ }
+
+ public int getAddressCount(String address) {
+ return map.get(address) - 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
new file mode 100644
index 0000000..ec00111
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cxf.clustering.FailoverFeature;
+import org.apache.cxf.clustering.RandomStrategy;
+import org.apache.cxf.clustering.SequentialStrategy;
+import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature;
+
+/**
+ * Tests failover within a static cluster.
+ */
+public class CircuitBreakerFailoverTest extends AbstractFailoverTest {
+ public static final String NON_PORT = allocatePort(CircuitBreakerFailoverTest.class);
+
+ @Override
+ protected FailoverFeature getFeature(boolean random, String ...address) {
+ CircuitBreakerFailoverFeature feature = new CircuitBreakerFailoverFeature();
+ List<String> alternateAddresses = new ArrayList<String>();
+ for (String s : address) {
+ alternateAddresses.add(s);
+ }
+ if (!random) {
+ SequentialStrategy strategy = new SequentialStrategy();
+ strategy.setAlternateAddresses(alternateAddresses);
+ feature.setStrategy(strategy);
+ } else {
+ RandomStrategy strategy = new RandomStrategy();
+ strategy.setAlternateAddresses(alternateAddresses);
+ feature.setStrategy(strategy);
+ }
+
+ return feature;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java
index 95bfacd..f27d1a8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java
@@ -20,191 +20,41 @@
package org.apache.cxf.systest.jaxrs.failover;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
-import javax.ws.rs.InternalServerErrorException;
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.core.Response;
import org.apache.cxf.clustering.FailoverFeature;
import org.apache.cxf.clustering.FailoverTargetSelector;
import org.apache.cxf.clustering.RandomStrategy;
-import org.apache.cxf.clustering.RetryStrategy;
import org.apache.cxf.clustering.SequentialStrategy;
-import org.apache.cxf.endpoint.ConduitSelector;
-import org.apache.cxf.feature.Feature;
-import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
-import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.systest.jaxrs.Book;
-import org.apache.cxf.systest.jaxrs.BookStore;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests failover within a static cluster.
*/
-public class FailoverTest extends AbstractBusClientServerTestBase {
- public static final String NON_PORT = allocatePort(FailoverTest.class);
-
- @BeforeClass
- public static void startServers() throws Exception {
- assertTrue("server did not launch correctly",
- launchServer(Server.class, true));
- boolean activeReplica1Started = false;
- boolean activeReplica2Started = false;
- for (int i = 0; i < 60; i++) {
- if (!activeReplica1Started) {
- activeReplica1Started = checkReplica(Server.ADDRESS2);
- }
- if (!activeReplica2Started) {
- activeReplica2Started = checkReplica(Server.ADDRESS3);
- }
- if (activeReplica1Started && activeReplica2Started) {
- break;
- }
- Thread.sleep(1000);
- }
- }
- private static boolean checkReplica(String address) {
- try {
- Response r = WebClient.create(address).query("_wadl").get();
- return r.getStatus() == 200;
- } catch (Exception ex) {
- return false;
- }
- }
-
- @Test
- public void testSequentialStrategy() throws Exception {
- FailoverFeature feature =
- getFeature(false, false, Server.ADDRESS2, Server.ADDRESS3);
- strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false, false);
- }
-
+public class FailoverTest extends AbstractFailoverTest {
@Test
public void testSequentialStrategyWithCustomTargetSelector() throws Exception {
- FailoverFeature feature =
- getFeature(true, false, Server.ADDRESS2, Server.ADDRESS3);
+ FailoverFeature feature = getCustomFeature(true, false, Server.ADDRESS2, Server.ADDRESS3);
strategyTest("resolver://info", feature, Server.ADDRESS3, null, false, false, false);
}
@Test
public void testSequentialStrategyWithCustomTargetSelector2() throws Exception {
- FailoverFeature feature =
- getFeature(true, false, Server.ADDRESS2, Server.ADDRESS3);
+ FailoverFeature feature = getCustomFeature(true, false, Server.ADDRESS2, Server.ADDRESS3);
strategyTest("resolver://info", feature, Server.ADDRESS3, null, false, false, true);
}
- @Test
- public void testSequentialStrategyWebClient() throws Exception {
- FailoverFeature feature =
- getFeature(false, false, Server.ADDRESS2, Server.ADDRESS3);
- strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false);
- }
-
- @Test
- public void testSequentialStrategyWith404() throws Exception {
- FailoverFeature feature = getFeature(false, false, Server.ADDRESS3);
- feature.getTargetSelector().setSupportNotAvailableErrorsOnly(true);
- strategyTestWebClient(Server.ADDRESS2 + "/new", feature, Server.ADDRESS3, null, false, false);
- }
-
- @Test
- public void testSequentialStrategyWith406() throws Exception {
- FailoverFeature feature = getFeature(false, false, Server.ADDRESS3);
- feature.getTargetSelector().setSupportNotAvailableErrorsOnly(false);
- strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, false);
- }
-
- @Test
- public void testSequentialStrategyWith406NoFailover() throws Exception {
- FailoverFeature feature = getFeature(false, false, Server.ADDRESS3);
- strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, true);
- }
-
- @Test
- public void testRandomStrategyWebClient() throws Exception {
- FailoverFeature feature =
- getFeature(false, true, Server.ADDRESS3, Server.ADDRESS2);
- strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, true);
- }
-
- @Test
- public void testRandomStrategy() throws Exception {
- FailoverFeature feature =
- getFeature(false, true, Server.ADDRESS2, Server.ADDRESS3);
- strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, true);
- }
-
- @Test
- public void testRandomStrategy2() throws Exception {
- FailoverFeature feature =
- getFeature(false, true, Server.ADDRESS2, Server.ADDRESS3);
- strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, false);
- }
-
- @Test
- public void testSequentialStrategyWithDiffBaseAddresses() throws Exception {
- FailoverFeature feature =
- getFeature(false, false, Server.ADDRESS3, null);
- strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, false);
- }
-
- public void testSequentialStrategyWithDiffBaseAddresses2() throws Exception {
- FailoverFeature feature =
- getFeature(false, false, Server.ADDRESS3, null);
- strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, true);
- }
-
- @Test(expected = InternalServerErrorException.class)
- public void testSequentialStrategyWithServerException() throws Exception {
- FailoverFeature feature =
- getFeature(false, false, Server.ADDRESS2, Server.ADDRESS3);
- strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, true, false, false);
- }
-
- @Test(expected = ProcessingException.class)
- public void testSequentialStrategyFailure() throws Exception {
- FailoverFeature feature =
- getFeature(false, false, "http://localhost:" + NON_PORT + "/non-existent");
- strategyTest(Server.ADDRESS1, feature, null, null, false, false, false);
- }
-
- @Test
- public void testSequentialStrategyWithRetries() throws Exception {
- String address = "http://localhost:" + NON_PORT + "/non-existent";
- String address2 = "http://localhost:" + NON_PORT + "/non-existent2";
-
- FailoverFeature feature = new FailoverFeature();
- List<String> alternateAddresses = new ArrayList<String>();
- alternateAddresses.add(address);
- alternateAddresses.add(address2);
- CustomRetryStrategy strategy = new CustomRetryStrategy();
- strategy.setMaxNumberOfRetries(5);
- strategy.setAlternateAddresses(alternateAddresses);
- feature.setStrategy(strategy);
-
- BookStore store = getBookStore(address, feature);
- try {
- store.getBook("1");
- fail("Exception expected");
- } catch (ProcessingException ex) {
- assertEquals(10, strategy.getTotalCount());
- assertEquals(5, strategy.getAddressCount(address));
- assertEquals(5, strategy.getAddressCount(address2));
- }
+ @Override
+ protected FailoverFeature getFeature(boolean random, String... address) {
+ return getCustomFeature(false, random, address);
}
-
- private FailoverFeature getFeature(boolean custom, boolean random, String ...address) {
+ private FailoverFeature getCustomFeature(boolean custom, boolean random, String ...address) {
FailoverFeature feature = new FailoverFeature();
List<String> alternateAddresses = new ArrayList<String>();
for (String s : address) {
@@ -226,184 +76,7 @@ public class FailoverTest extends AbstractBusClientServerTestBase {
return feature;
}
-
- protected BookStore getBookStore(String address,
- FailoverFeature feature) throws Exception {
- JAXRSClientFactoryBean bean = createBean(address, feature);
- bean.setServiceClass(BookStore.class);
- return bean.create(BookStore.class);
- }
-
- protected WebClient getWebClient(String address,
- FailoverFeature feature) throws Exception {
- JAXRSClientFactoryBean bean = createBean(address, feature);
-
- return bean.createWebClient();
- }
-
- protected JAXRSClientFactoryBean createBean(String address,
- FailoverFeature feature) {
- JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
- bean.setAddress(address);
- List<Feature> features = new ArrayList<Feature>();
- features.add(feature);
- bean.setFeatures(features);
-
- return bean;
- }
-
- protected void strategyTest(String inactiveReplica,
- FailoverFeature feature,
- String activeReplica1,
- String activeReplica2,
- boolean expectServerException,
- boolean expectRandom,
- boolean singleProxy) throws Exception {
- boolean randomized = false;
- String prevEndpoint = null;
- BookStore bookStore = null;
-
- if (singleProxy) {
- bookStore = getBookStore(inactiveReplica, feature);
- }
-
- for (int i = 0; i < 20; i++) {
- if (!singleProxy) {
- feature.getTargetSelector().close();
- bookStore = getBookStore(inactiveReplica, feature);
- }
- verifyStrategy(bookStore, expectRandom
- ? RandomStrategy.class
- : SequentialStrategy.class);
- Exception ex = null;
- try {
- if (expectServerException) {
- bookStore.getBook("9999");
- fail("Exception expected");
- } else {
- Book book = bookStore.echoBookElementJson(new Book("CXF", 123));
- assertNotNull("expected non-null response", book);
- assertEquals("unexpected id", 123L, book.getId());
- }
- } catch (Exception error) {
- if (!expectServerException) {
- //String currEndpoint = getCurrentEndpointAddress(bookStore);
- //assertTrue(currEndpoint.equals(inactiveReplica));
- throw error;
- }
- ex = error;
- }
- String currEndpoint = getCurrentEndpointAddress(bookStore);
- assertFalse(currEndpoint.equals(inactiveReplica));
- if (expectRandom) {
- assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2));
- } else {
- assertEquals(activeReplica1, currEndpoint);
- }
- if (expectServerException) {
- assertNotNull(ex);
- throw ex;
- }
-
- if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) {
- randomized = true;
- }
- prevEndpoint = currEndpoint;
- }
- if (!singleProxy) {
- assertEquals("unexpected random/sequential distribution of failovers",
- expectRandom,
- randomized);
- }
- }
-
- protected void strategyTestWebClient(String inactiveReplica,
- FailoverFeature feature,
- String activeReplica1,
- String activeReplica2,
- boolean expectServerException,
- boolean expectRandom) throws Exception {
- boolean randomized = false;
- String prevEndpoint = null;
- for (int i = 0; i < 20; i++) {
- feature.getTargetSelector().close();
- WebClient bookStore = getWebClient(inactiveReplica, feature);
- verifyStrategy(bookStore, expectRandom
- ? RandomStrategy.class
- : SequentialStrategy.class);
- String bookId = expectServerException ? "9999" : "123";
- bookStore.path("bookstore/books").path(bookId);
- Exception ex = null;
- try {
- Book book = bookStore.get(Book.class);
- assertNotNull("expected non-null response", book);
- assertEquals("unexpected id", 123L, book.getId());
- } catch (Exception error) {
- if (!expectServerException) {
- throw error;
- }
- ex = error;
- }
- String currEndpoint = getCurrentEndpointAddress(bookStore);
- assertFalse(currEndpoint.equals(inactiveReplica));
- if (expectRandom) {
- assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2));
- } else {
- assertTrue(currEndpoint.equals(activeReplica1));
- }
- if (expectServerException) {
- assertNotNull(ex);
- throw ex;
- }
-
- if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) {
- randomized = true;
- }
- prevEndpoint = currEndpoint;
- }
- assertEquals("unexpected random/sequential distribution of failovers",
- expectRandom,
- randomized);
- }
-
- protected void strategyTestWebClientHttpError(String currentReplica,
- FailoverFeature feature,
- String newReplica,
- boolean notAvailableOnly) throws Exception {
- WebClient bookStore = getWebClient(currentReplica, feature);
- verifyStrategy(bookStore, SequentialStrategy.class);
- bookStore.path("bookstore/webappexceptionXML");
- Response r = bookStore.get();
- assertEquals(406, r.getStatus());
- String currEndpoint = getCurrentEndpointAddress(bookStore);
- if (notAvailableOnly) {
- assertTrue(currEndpoint.equals(currentReplica));
- } else {
- assertTrue(currEndpoint.equals(newReplica));
- }
- }
-
- protected String getCurrentEndpointAddress(Object client) {
- String currentBaseURI = WebClient.client(client).getBaseURI().toString();
- String currentURI = WebClient.client(client).getCurrentURI().toString();
- assertTrue(currentURI.startsWith(currentBaseURI));
- return currentBaseURI;
- }
-
-
- protected void verifyStrategy(Object proxy, Class<?> clz) {
- ConduitSelector conduitSelector =
- WebClient.getConfig(proxy).getConduitSelector();
- if (conduitSelector instanceof FailoverTargetSelector) {
- Object strategy =
- ((FailoverTargetSelector)conduitSelector).getStrategy();
- assertTrue("unexpected strategy", clz.isInstance(strategy));
- } else {
- fail("unexpected conduit selector: " + conduitSelector);
- }
- }
-
private static class ReplaceInitialAddressSelector extends FailoverTargetSelector {
@Override
public synchronized void prepare(Message message) {
@@ -414,34 +87,8 @@ public class FailoverTest extends AbstractBusClientServerTestBase {
}
@Override
- protected boolean requiresFailover(Exchange exchange) {
+ protected boolean requiresFailover(Exchange exchange, Exception ex) {
return false;
}
}
-
- private static class CustomRetryStrategy extends RetryStrategy {
- private int totalCount;
- private Map<String, Integer> map = new HashMap<String, Integer>();
- @Override
- protected <T> T getNextAlternate(List<T> alternates) {
- totalCount++;
- T next = super.getNextAlternate(alternates);
- String address = (String)next;
- Integer count = map.get(address);
- if (count == null) {
- count = 0;
- }
- count++;
- map.put(address, count);
- return next;
- }
-
- public int getTotalCount() {
- return totalCount - 2;
- }
-
- public int getAddressCount(String address) {
- return map.get(address) - 1;
- }
- }
}