You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2013/08/06 13:11:10 UTC
[2/5] git commit: CAMEL-6609 fixed the issue of CXF FailoverFeature
does not take effect when camel-cxf producer uses async invocation
CAMEL-6609 fixed the issue of CXF FailoverFeature does not take effect when camel-cxf producer uses async invocation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3551f02d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3551f02d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3551f02d
Branch: refs/heads/camel-2.11.x
Commit: 3551f02da7b6e540ec361192aa97837a26658f54
Parents: 9f2ca67
Author: Willem Jiang <ni...@apache.org>
Authored: Tue Aug 6 10:27:52 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Tue Aug 6 19:00:51 2013 +0800
----------------------------------------------------------------------
components/camel-cxf/pom.xml | 7 +
.../camel/component/cxf/CxfClientCallback.java | 26 +++-
.../apache/camel/component/cxf/CxfProducer.java | 2 +-
.../component/cxf/FailOverFeatureTest.java | 142 +++++++++++++++++++
4 files changed, 175 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-cxf/pom.xml b/components/camel-cxf/pom.xml
index ee88a96..ee28da9 100644
--- a/components/camel-cxf/pom.xml
+++ b/components/camel-cxf/pom.xml
@@ -171,6 +171,13 @@
<scope>test</scope>
</dependency>
+ <!-- test for cxf failover feature -->
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-features-clustering</artifactId>
+ <version>${cxf-version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
index 2e6e60a..a6f936f 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
@@ -20,7 +20,9 @@ import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.ClientCallback;
+import org.apache.cxf.endpoint.ClientImpl;
import org.apache.cxf.service.model.BindingOperationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +35,25 @@ public class CxfClientCallback extends ClientCallback {
private final org.apache.cxf.message.Exchange cxfExchange;
private final BindingOperationInfo boi;
private final CxfBinding binding;
+ private final Client client;
public CxfClientCallback(AsyncCallback callback,
Exchange camelExchange,
org.apache.cxf.message.Exchange cxfExchange,
BindingOperationInfo boi,
CxfBinding binding) {
+ this(null, callback, camelExchange, cxfExchange, boi, binding);
+ }
+
+ public CxfClientCallback(Client client, AsyncCallback callback,
+ Exchange camelExchange,
+ org.apache.cxf.message.Exchange cxfExchange,
+ BindingOperationInfo boi,
+ CxfBinding binding) {
this.camelAsyncCallback = callback;
this.camelExchange = camelExchange;
this.cxfExchange = cxfExchange;
+ this.client = client;
this.boi = boi;
this.binding = binding;
}
@@ -69,7 +81,19 @@ public class CxfClientCallback extends ClientCallback {
public void handleException(Map<String, Object> ctx, Throwable ex) {
try {
super.handleException(ctx, ex);
- camelExchange.setException(ex);
+ // need to call the conduitSelector complete method to enable the fail over feature
+ if (client instanceof ClientImpl) {
+ ((ClientImpl)client).getConduitSelector().complete(cxfExchange);
+ ex = cxfExchange.getOutMessage().getContent(Exception.class);
+ if (ex == null && cxfExchange.getInMessage() != null) {
+ ex = cxfExchange.getInMessage().getContent(Exception.class);
+ }
+ if (ex != null) {
+ camelExchange.setException(ex);
+ }
+ } else {
+ camelExchange.setException(ex);
+ }
} finally {
// copy the context information and
// call camel callback
http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
index 3f1ec4c..b8a803d 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
@@ -106,7 +106,7 @@ public class CxfProducer extends DefaultProducer implements AsyncProcessor {
invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange));
- CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi,
+ CxfClientCallback cxfClientCallback = new CxfClientCallback(client, callback, camelExchange, cxfExchange, boi,
endpoint.getCxfBinding());
// send the CXF async request
client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange),
http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java
new file mode 100644
index 0000000..4b7239a
--- /dev/null
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.ws.Endpoint;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CxfEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.cxf.clustering.FailoverFeature;
+import org.apache.cxf.clustering.RandomStrategy;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FailOverFeatureTest {
+
+ private static int port1 = CXFTestSupport.getPort1();
+ private static int port2 = CXFTestSupport.getPort2();
+ private static int port3 = CXFTestSupport.getPort3();
+ private static int port4 = AvailablePortFinder.getNextAvailable();
+
+ private static final String SERVICE_ADDRESS = "http://localhost:" + port1 + "/FailOverFeatureTest";
+ private static final String PAYLOAD_PROXY_ADDRESS = "http://localhost:" + port2 + "/FailOverFeatureTest/proxy";
+ private static final String POJO_PROXY_ADDRESS = "http://localhost:" + port3 + "/FailOverFeatureTest/proxy";
+ private static final String NONE_EXIST_ADDRESS = "http://localhost:" + port4 + "/FailOverFeatureTest";
+ private DefaultCamelContext context1;
+ private DefaultCamelContext context2;
+
+ @BeforeClass
+ public static void init() {
+
+ // publish a web-service
+ String addr1 = "http://localhost:9001/hello";
+ ServerFactoryBean factory = new ServerFactoryBean();
+ factory.setAddress(SERVICE_ADDRESS);
+ factory.setServiceBean(new HelloServiceImpl());
+ factory.create();
+ }
+
+ @Test
+ public void testPojo() throws Exception {
+ startRoutePojo();
+ Assert.assertEquals("hello", tryFailover(POJO_PROXY_ADDRESS));
+ if (context2 != null) {
+ context2.stop();
+ }
+ }
+
+ @Test
+ public void testPayload() throws Exception {
+ startRoutePayload();
+ Assert.assertEquals("hello", tryFailover(PAYLOAD_PROXY_ADDRESS));
+ if (context1 != null) {
+ context1.stop();
+ }
+ }
+
+ private void startRoutePayload() throws Exception {
+
+ String proxy = "cxf://" + PAYLOAD_PROXY_ADDRESS + "?wsdlURL=" + SERVICE_ADDRESS + "?wsdl"
+ + "&dataFormat=PAYLOAD";
+
+ // use a non-exists address to trigger fail-over
+ // another problem is: if synchronous=false fail-over will not happen
+ String real = "cxf://" + NONE_EXIST_ADDRESS + "?wsdlURL=" + SERVICE_ADDRESS + "?wsdl"
+ + "&dataFormat=PAYLOAD";
+
+ context1 = new DefaultCamelContext();
+ startRoute(context1, proxy, real);
+ }
+
+ private void startRoutePojo() throws Exception {
+
+ String proxy = "cxf://" + POJO_PROXY_ADDRESS + "?serviceClass=" + "org.apache.camel.component.cxf.HelloService"
+ + "&dataFormat=POJO";
+
+ // use a non-exists address to trigger fail-over
+ String real = "cxf://" + NONE_EXIST_ADDRESS + "?serviceClass=" + "org.apache.camel.component.cxf.HelloService"
+ + "&dataFormat=POJO";
+
+ context2 = new DefaultCamelContext();
+ startRoute(context2, proxy, real);
+ }
+
+ private void startRoute(DefaultCamelContext ctx, final String proxy, final String real) throws Exception {
+
+ ctx.addRoutes(new RouteBuilder() {
+ public void configure() {
+ String alt = SERVICE_ADDRESS;
+
+ List<String> serviceList = new ArrayList<String>();
+ serviceList.add(alt);
+
+ RandomStrategy strategy = new RandomStrategy();
+ strategy.setAlternateAddresses(serviceList);
+
+ FailoverFeature ff = new FailoverFeature();
+ ff.setStrategy(strategy);
+
+ CxfEndpoint endpoint = (CxfEndpoint)(endpoint(real));
+ endpoint.getFeatures().add(ff);
+
+ from(proxy).to(endpoint);
+ }
+ });
+ ctx.start();
+
+ }
+
+ private String tryFailover(String url) {
+
+ ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
+
+ factory.setServiceClass(HelloService.class);
+ factory.setAddress(url);
+
+ HelloService client = (HelloService)factory.create();
+ return client.sayHello();
+ }
+
+}