You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/07/22 16:10:43 UTC

svn commit: r966670 - in /camel/trunk: components/camel-cxf/src/main/java/org/apache/camel/component/cxf/ components/camel-cxf/src/test/java/org/apache/camel/component/cxf/ parent/

Author: ningjiang
Date: Thu Jul 22 14:10:42 2010
New Revision: 966670

URL: http://svn.apache.org/viewvc?rev=966670&view=rev
Log:
CAMEL-2898 CXF component supports non blocking routing engine

Added:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java   (with props)
Modified:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
    camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java
    camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java
    camel/trunk/parent/pom.xml

Added: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java?rev=966670&view=auto
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java (added)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java Thu Jul 22 14:10:42 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import java.util.Map;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientCallback;
+import org.apache.cxf.service.model.BindingOperationInfo;
+
+public class CxfClientCallback extends ClientCallback {
+    private final AsyncCallback camelAsyncCallback;
+    private final Exchange camelExchange;
+    private final org.apache.cxf.message.Exchange cxfExchange;
+    private final BindingOperationInfo boi;
+    private final CxfEndpoint endpoint;
+    
+    public CxfClientCallback(AsyncCallback callback, 
+                             Exchange camelExchange,
+                             org.apache.cxf.message.Exchange cxfExchange,
+                             BindingOperationInfo boi,
+                             CxfEndpoint endpoint) {
+        this.camelAsyncCallback = callback;
+        this.camelExchange = camelExchange;
+        this.cxfExchange = cxfExchange;
+        this.boi = boi;
+        this.endpoint = endpoint;       
+    }
+    
+    public void handleResponse(Map<String, Object> ctx, Object[] res) {
+        super.handleResponse(ctx, res);
+        // bind the CXF response to Camel exchange
+        if (!boi.getOperationInfo().isOneWay()) {
+            // copy the InMessage header to OutMessage header
+            camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders());
+            endpoint.getCxfBinding().populateExchangeFromCxfResponse(camelExchange, cxfExchange,
+                    ctx);
+        }
+        camelAsyncCallback.done(false);
+    }
+    
+    public void handleException(Map<String, Object> ctx, Throwable ex) {
+        super.handleException(ctx, ex);
+        camelExchange.setException(ex);
+        camelAsyncCallback.done(false);
+    }
+    
+
+}

Propchange: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Thu Jul 22 14:10:42 2010
@@ -27,6 +27,8 @@ import javax.xml.namespace.QName;
 import javax.xml.ws.Holder;
 import javax.xml.ws.handler.MessageContext.Scope;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultProducer;
@@ -48,9 +50,10 @@ import org.apache.cxf.service.model.Bind
  *
  * @version $Revision$
  */
-public class CxfProducer extends DefaultProducer {
+public class CxfProducer extends DefaultProducer implements AsyncProcessor {
     private static final Log LOG = LogFactory.getLog(CxfProducer.class);
     private Client client;
+    private CxfEndpoint endpoint;
 
     /**
      * Constructor to create a CxfProducer.  It will create a CXF client
@@ -62,8 +65,42 @@ public class CxfProducer extends Default
      */
     public CxfProducer(CxfEndpoint endpoint) throws Exception {
         super(endpoint);
+        this.endpoint = endpoint;
         client = endpoint.createClient();
     }
+    
+    // As the cxf client async and sync api is implement different,
+    // so we don't delegate the sync process call to the async process 
+    public boolean process(Exchange camelExchange, AsyncCallback callback) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Process exchange: " + camelExchange);
+        }
+        
+        try {
+            // create CXF exchange
+            ExchangeImpl cxfExchange = new ExchangeImpl();
+            
+            // prepare binding operation info
+            BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange);
+            
+            Map<String, Object> invocationContext = new HashMap<String, Object>();
+            Map<String, Object> responseContext = new HashMap<String, Object>();
+            invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
+            invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange));
+            
+            CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi, endpoint);
+            // send the CXF async request
+            client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange), 
+                          invocationContext, cxfExchange);
+        } catch (Exception ex) {
+            // error occurred before we had a chance to go async
+            // so set exception and invoke callback true
+            camelExchange.setException(ex);
+            callback.done(true);
+            return true;
+        }
+        return false;
+    }
 
     /**
      * This processor binds Camel exchange to a CXF exchange and
@@ -78,15 +115,32 @@ public class CxfProducer extends Default
         // create CXF exchange
         ExchangeImpl cxfExchange = new ExchangeImpl();
         
-        // get CXF binding
-        CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
-        CxfBinding binding = endpoint.getCxfBinding();
+        // prepare binding operation info
+        BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange);
+        
+        Map<String, Object> invocationContext = new HashMap<String, Object>();
+        Map<String, Object> responseContext = new HashMap<String, Object>();
+        invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
+        invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange));
+        
+        // send the CXF request
+        client.invoke(boi, getParams(endpoint, camelExchange), 
+                      invocationContext, cxfExchange);
+        
+        // bind the CXF response to Camel exchange
+        if (!boi.getOperationInfo().isOneWay()) {
+            // copy the InMessage header to OutMessage header
+            camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders());
+            endpoint.getCxfBinding().populateExchangeFromCxfResponse(camelExchange, cxfExchange,
+                    responseContext);
+        }
+    }
+    
+    protected Map<String, Object> prepareRequest(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) throws Exception {
         
         // create invocation context
         WrappedMessageContext requestContext = new WrappedMessageContext(
                 new HashMap<String, Object>(), null, Scope.APPLICATION);
-        Map<String, Object> responseContext = new HashMap<String, Object>();
-        
         
         // set data format mode in exchange
         DataFormat dataFormat = endpoint.getDataFormat();
@@ -107,13 +161,26 @@ public class CxfProducer extends Default
                         + "=" + true);
             }
         }
+     
+        // bind the request CXF exchange
+        endpoint.getCxfBinding().populateCxfRequestFromExchange(cxfExchange, camelExchange, 
+                requestContext);
+        
+        // Remove protocol headers from scopes.  Otherwise, response headers can be
+        // overwritten by request headers when SOAPHandlerInterceptor tries to create
+        // a wrapped message context by the copyScoped() method.
+        requestContext.getScopes().remove(Message.PROTOCOL_HEADERS);
         
+        return requestContext.getWrappedMap();
+    }
+    
+    private BindingOperationInfo prepareBindingOperation(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) {
         // get binding operation info
         BindingOperationInfo boi = getBindingOperationInfo(camelExchange);
         ObjectHelper.notNull(boi, "BindingOperationInfo");
         
         // keep the message wrapper in PAYLOAD mode
-        if (dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) {
+        if (endpoint.getDataFormat() == DataFormat.PAYLOAD && boi.isUnwrapped()) {
             boi = boi.getWrappedOperation();
             cxfExchange.put(BindingOperationInfo.class, boi);
             
@@ -126,7 +193,7 @@ public class CxfProducer extends Default
         }
 
         // Unwrap boi before passing it to make a client call
-        if (dataFormat != DataFormat.PAYLOAD && !endpoint.isWrapped() && boi != null) {
+        if (endpoint.getDataFormat() != DataFormat.PAYLOAD && !endpoint.isWrapped() && boi != null) {
             if (boi.isUnwrappedCapable()) {
                 boi = boi.getUnwrappedOperation();
                 if (LOG.isTraceEnabled()) {
@@ -134,31 +201,7 @@ public class CxfProducer extends Default
                 }
             }
         }
-     
-        // bind the request CXF exchange
-        binding.populateCxfRequestFromExchange(cxfExchange, camelExchange, 
-                requestContext);
-        
-        // Remove protocol headers from scopes.  Otherwise, response headers can be
-        // overwritten by request headers when SOAPHandlerInterceptor tries to create
-        // a wrapped message context by the copyScoped() method.
-        requestContext.getScopes().remove(Message.PROTOCOL_HEADERS);
-        
-        Map<String, Object> invocationContext = new HashMap<String, Object>();
-        invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
-        invocationContext.put(Client.REQUEST_CONTEXT, requestContext.getWrappedMap());
-
-        // send the CXF request
-        client.invoke(boi, getParams(endpoint, camelExchange), 
-                invocationContext, cxfExchange);
-        
-        // bind the CXF response to Camel exchange
-        if (!boi.getOperationInfo().isOneWay()) {
-            // copy the InMessage header to OutMessage header
-            camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders());
-            binding.populateExchangeFromCxfResponse(camelExchange, cxfExchange,
-                    responseContext);
-        }
+        return  boi;
     }
     
     private void checkParameterSize(CxfEndpoint endpoint, Exchange exchange, Object[] parameters) {

Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java (original)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java Thu Jul 22 14:10:42 2010
@@ -28,6 +28,7 @@ import org.junit.Test;
 public class CxfRawMessageRouterTest extends CxfSimpleRouterTest {
     private String routerEndpointURI = "cxf://" + ROUTER_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=MESSAGE";
     private String serviceEndpointURI = "cxf://" + SERVICE_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=MESSAGE";
+    
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
@@ -60,6 +61,7 @@ public class CxfRawMessageRouterTest ext
                                          + "<soap:Body><ns1:echo xmlns:ns1=\"http://cxf.component.camel.apache.org/\">"
                                          + "<arg0 xmlns=\"http://cxf.component.camel.apache.org/\">hello world</arg0>"
                                          + "</ns1:echo></soap:Body></soap:Envelope>");
+                
             }
 
         });

Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java (original)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java Thu Jul 22 14:10:42 2010
@@ -60,11 +60,11 @@ public class CxfTimeoutTest extends Came
     protected void sendTimeOutMessage(String endpointUri) throws Exception {
         Exchange reply = sendJaxWsMessage(endpointUri);
         Exception e = reply.getException();
-        assertNotNull("We should get the exception cause here", e.getCause());
-        assertTrue("We should get the socket time out exception here", e.getCause().getCause() instanceof SocketTimeoutException);
+        assertNotNull("We should get the exception cause here", e);
+        assertTrue("We should get the socket time out exception here", e instanceof SocketTimeoutException);
     }
 
-    protected Exchange sendJaxWsMessage(String endpointUri) {
+    protected Exchange sendJaxWsMessage(String endpointUri) throws InterruptedException {
         Exchange exchange = template.send(endpointUri, new Processor() {
             public void process(final Exchange exchange) {
                 final List<String> params = new ArrayList<String>();

Modified: camel/trunk/parent/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/parent/pom.xml?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/parent/pom.xml (original)
+++ camel/trunk/parent/pom.xml Thu Jul 22 14:10:42 2010
@@ -48,7 +48,7 @@
     <commons-collections-version>3.2.1</commons-collections-version>
     <commons-pool-version>1.5.4</commons-pool-version>
     <commons-dbcp-version>1.3</commons-dbcp-version>
-    <cxf-version>2.2.9</cxf-version>
+    <cxf-version>2.2.10-SNAPSHOT</cxf-version>
     <derby-version>10.4.2.0</derby-version>
     <dozer-version>5.2.2</dozer-version>
     <easymock-version>2.5.2</easymock-version>