You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2013/08/06 13:11:10 UTC

[2/5] git commit: CAMEL-6609 fixed the issue of CXF FailoverFeature does not take effect when camel-cxf producer uses async invocation

CAMEL-6609 fixed the issue of CXF FailoverFeature does not take effect when camel-cxf producer uses async invocation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3551f02d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3551f02d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3551f02d

Branch: refs/heads/camel-2.11.x
Commit: 3551f02da7b6e540ec361192aa97837a26658f54
Parents: 9f2ca67
Author: Willem Jiang <ni...@apache.org>
Authored: Tue Aug 6 10:27:52 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Tue Aug 6 19:00:51 2013 +0800

----------------------------------------------------------------------
 components/camel-cxf/pom.xml                    |   7 +
 .../camel/component/cxf/CxfClientCallback.java  |  26 +++-
 .../apache/camel/component/cxf/CxfProducer.java |   2 +-
 .../component/cxf/FailOverFeatureTest.java      | 142 +++++++++++++++++++
 4 files changed, 175 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-cxf/pom.xml b/components/camel-cxf/pom.xml
index ee88a96..ee28da9 100644
--- a/components/camel-cxf/pom.xml
+++ b/components/camel-cxf/pom.xml
@@ -171,6 +171,13 @@
        <scope>test</scope>
     </dependency>
     
+    <!-- test for cxf failover feature -->
+	<dependency>
+		<groupId>org.apache.cxf</groupId>
+		<artifactId>cxf-rt-features-clustering</artifactId>
+		<version>${cxf-version}</version>
+	</dependency>
+    
     <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-kahadb-store</artifactId>

http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
index 2e6e60a..a6f936f 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
@@ -20,7 +20,9 @@ import java.util.Map;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.ClientCallback;
+import org.apache.cxf.endpoint.ClientImpl;
 import org.apache.cxf.service.model.BindingOperationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,15 +35,25 @@ public class CxfClientCallback extends ClientCallback {
     private final org.apache.cxf.message.Exchange cxfExchange;
     private final BindingOperationInfo boi;
     private final CxfBinding binding;
+    private final Client client;
     
     public CxfClientCallback(AsyncCallback callback, 
                              Exchange camelExchange,
                              org.apache.cxf.message.Exchange cxfExchange,
                              BindingOperationInfo boi,
                              CxfBinding binding) {
+        this(null, callback, camelExchange, cxfExchange, boi, binding);       
+    }
+    
+    public CxfClientCallback(Client client, AsyncCallback callback, 
+                             Exchange camelExchange,
+                             org.apache.cxf.message.Exchange cxfExchange,
+                             BindingOperationInfo boi,
+                             CxfBinding binding) {
         this.camelAsyncCallback = callback;
         this.camelExchange = camelExchange;
         this.cxfExchange = cxfExchange;
+        this.client = client;
         this.boi = boi;
         this.binding = binding;       
     }
@@ -69,7 +81,19 @@ public class CxfClientCallback extends ClientCallback {
     public void handleException(Map<String, Object> ctx, Throwable ex) {
         try {
             super.handleException(ctx, ex);
-            camelExchange.setException(ex);
+            // need to call the conduitSelector complete method to enable the fail over feature
+            if (client instanceof ClientImpl) {
+                ((ClientImpl)client).getConduitSelector().complete(cxfExchange);
+                ex = cxfExchange.getOutMessage().getContent(Exception.class);
+                if (ex == null && cxfExchange.getInMessage() != null) {
+                    ex = cxfExchange.getInMessage().getContent(Exception.class);
+                }
+                if (ex != null) {
+                    camelExchange.setException(ex);
+                }
+            } else {
+                camelExchange.setException(ex);
+            }
         } finally {
             // copy the context information and 
             // call camel callback

http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
index 3f1ec4c..b8a803d 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
@@ -106,7 +106,7 @@ public class CxfProducer extends DefaultProducer implements AsyncProcessor {
             invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
             invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange));
             
-            CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi, 
+            CxfClientCallback cxfClientCallback = new CxfClientCallback(client, callback, camelExchange, cxfExchange, boi, 
                                                                         endpoint.getCxfBinding());
             // send the CXF async request
             client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange), 

http://git-wip-us.apache.org/repos/asf/camel/blob/3551f02d/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java
new file mode 100644
index 0000000..4b7239a
--- /dev/null
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/FailOverFeatureTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.ws.Endpoint;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CxfEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.cxf.clustering.FailoverFeature;
+import org.apache.cxf.clustering.RandomStrategy;
+import org.apache.cxf.frontend.ClientProxyFactoryBean;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FailOverFeatureTest {
+    
+    private static int port1 = CXFTestSupport.getPort1(); 
+    private static int port2 = CXFTestSupport.getPort2();
+    private static int port3 = CXFTestSupport.getPort3();
+    private static int port4 = AvailablePortFinder.getNextAvailable(); 
+    
+    private static final String SERVICE_ADDRESS = "http://localhost:" + port1 + "/FailOverFeatureTest";
+    private static final String PAYLOAD_PROXY_ADDRESS = "http://localhost:" + port2 + "/FailOverFeatureTest/proxy";
+    private static final String POJO_PROXY_ADDRESS = "http://localhost:" + port3 + "/FailOverFeatureTest/proxy";
+    private static final String NONE_EXIST_ADDRESS = "http://localhost:" + port4 + "/FailOverFeatureTest";
+    private DefaultCamelContext context1;
+    private DefaultCamelContext context2;
+
+    @BeforeClass
+    public static void init() {
+
+        // publish a web-service
+        String addr1 = "http://localhost:9001/hello";
+        ServerFactoryBean factory = new ServerFactoryBean();
+        factory.setAddress(SERVICE_ADDRESS);
+        factory.setServiceBean(new HelloServiceImpl());
+        factory.create();
+    }
+
+    @Test
+    public void testPojo() throws Exception {
+        startRoutePojo();
+        Assert.assertEquals("hello", tryFailover(POJO_PROXY_ADDRESS));
+        if (context2 != null) {
+            context2.stop();
+        }
+    }
+
+    @Test
+    public void testPayload() throws Exception {
+        startRoutePayload();
+        Assert.assertEquals("hello", tryFailover(PAYLOAD_PROXY_ADDRESS));
+        if (context1 != null) {
+            context1.stop();
+        }
+    }
+
+    private void startRoutePayload() throws Exception {
+
+        String proxy = "cxf://" + PAYLOAD_PROXY_ADDRESS + "?wsdlURL=" + SERVICE_ADDRESS + "?wsdl"
+                       + "&dataFormat=PAYLOAD";
+
+        // use a non-exists address to trigger fail-over
+        // another problem is: if synchronous=false fail-over will not happen
+        String real = "cxf://" + NONE_EXIST_ADDRESS + "?wsdlURL=" + SERVICE_ADDRESS + "?wsdl"
+                      + "&dataFormat=PAYLOAD";
+
+        context1 = new DefaultCamelContext();
+        startRoute(context1, proxy, real);
+    }
+
+    private void startRoutePojo() throws Exception {
+
+        String proxy = "cxf://" + POJO_PROXY_ADDRESS + "?serviceClass=" + "org.apache.camel.component.cxf.HelloService"
+                       + "&dataFormat=POJO";
+
+        // use a non-exists address to trigger fail-over
+        String real = "cxf://" + NONE_EXIST_ADDRESS + "?serviceClass=" + "org.apache.camel.component.cxf.HelloService"
+                      + "&dataFormat=POJO";
+
+        context2 = new DefaultCamelContext();
+        startRoute(context2, proxy, real);
+    }
+
+    private void startRoute(DefaultCamelContext ctx, final String proxy, final String real) throws Exception {
+
+        ctx.addRoutes(new RouteBuilder() {
+            public void configure() {
+                String alt = SERVICE_ADDRESS;
+
+                List<String> serviceList = new ArrayList<String>();
+                serviceList.add(alt);
+
+                RandomStrategy strategy = new RandomStrategy();
+                strategy.setAlternateAddresses(serviceList);
+
+                FailoverFeature ff = new FailoverFeature();
+                ff.setStrategy(strategy);
+
+                CxfEndpoint endpoint = (CxfEndpoint)(endpoint(real));
+                endpoint.getFeatures().add(ff);
+
+                from(proxy).to(endpoint);
+            }
+        });
+        ctx.start();
+
+    }
+
+    private String tryFailover(String url) {
+
+        ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
+
+        factory.setServiceClass(HelloService.class);
+        factory.setAddress(url);
+
+        HelloService client = (HelloService)factory.create();
+        return client.sayHello();
+    }
+
+}