You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/07/22 16:10:43 UTC
svn commit: r966670 - in /camel/trunk:
components/camel-cxf/src/main/java/org/apache/camel/component/cxf/
components/camel-cxf/src/test/java/org/apache/camel/component/cxf/ parent/
Author: ningjiang
Date: Thu Jul 22 14:10:42 2010
New Revision: 966670
URL: http://svn.apache.org/viewvc?rev=966670&view=rev
Log:
CAMEL-2898 CXF component supports non blocking routing engine
Added:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java (with props)
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java
camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java
camel/trunk/parent/pom.xml
Added: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java?rev=966670&view=auto
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java (added)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java Thu Jul 22 14:10:42 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import java.util.Map;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientCallback;
+import org.apache.cxf.service.model.BindingOperationInfo;
+
+public class CxfClientCallback extends ClientCallback {
+ private final AsyncCallback camelAsyncCallback;
+ private final Exchange camelExchange;
+ private final org.apache.cxf.message.Exchange cxfExchange;
+ private final BindingOperationInfo boi;
+ private final CxfEndpoint endpoint;
+
+ public CxfClientCallback(AsyncCallback callback,
+ Exchange camelExchange,
+ org.apache.cxf.message.Exchange cxfExchange,
+ BindingOperationInfo boi,
+ CxfEndpoint endpoint) {
+ this.camelAsyncCallback = callback;
+ this.camelExchange = camelExchange;
+ this.cxfExchange = cxfExchange;
+ this.boi = boi;
+ this.endpoint = endpoint;
+ }
+
+ public void handleResponse(Map<String, Object> ctx, Object[] res) {
+ super.handleResponse(ctx, res);
+ // bind the CXF response to Camel exchange
+ if (!boi.getOperationInfo().isOneWay()) {
+ // copy the InMessage header to OutMessage header
+ camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders());
+ endpoint.getCxfBinding().populateExchangeFromCxfResponse(camelExchange, cxfExchange,
+ ctx);
+ }
+ camelAsyncCallback.done(false);
+ }
+
+ public void handleException(Map<String, Object> ctx, Throwable ex) {
+ super.handleException(ctx, ex);
+ camelExchange.setException(ex);
+ camelAsyncCallback.done(false);
+ }
+
+
+}
Propchange: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfClientCallback.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Thu Jul 22 14:10:42 2010
@@ -27,6 +27,8 @@ import javax.xml.namespace.QName;
import javax.xml.ws.Holder;
import javax.xml.ws.handler.MessageContext.Scope;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultProducer;
@@ -48,9 +50,10 @@ import org.apache.cxf.service.model.Bind
*
* @version $Revision$
*/
-public class CxfProducer extends DefaultProducer {
+public class CxfProducer extends DefaultProducer implements AsyncProcessor {
private static final Log LOG = LogFactory.getLog(CxfProducer.class);
private Client client;
+ private CxfEndpoint endpoint;
/**
* Constructor to create a CxfProducer. It will create a CXF client
@@ -62,8 +65,42 @@ public class CxfProducer extends Default
*/
public CxfProducer(CxfEndpoint endpoint) throws Exception {
super(endpoint);
+ this.endpoint = endpoint;
client = endpoint.createClient();
}
+
+ // As the cxf client async and sync api is implement different,
+ // so we don't delegate the sync process call to the async process
+ public boolean process(Exchange camelExchange, AsyncCallback callback) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Process exchange: " + camelExchange);
+ }
+
+ try {
+ // create CXF exchange
+ ExchangeImpl cxfExchange = new ExchangeImpl();
+
+ // prepare binding operation info
+ BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange);
+
+ Map<String, Object> invocationContext = new HashMap<String, Object>();
+ Map<String, Object> responseContext = new HashMap<String, Object>();
+ invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
+ invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange));
+
+ CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi, endpoint);
+ // send the CXF async request
+ client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange),
+ invocationContext, cxfExchange);
+ } catch (Exception ex) {
+ // error occurred before we had a chance to go async
+ // so set exception and invoke callback true
+ camelExchange.setException(ex);
+ callback.done(true);
+ return true;
+ }
+ return false;
+ }
/**
* This processor binds Camel exchange to a CXF exchange and
@@ -78,15 +115,32 @@ public class CxfProducer extends Default
// create CXF exchange
ExchangeImpl cxfExchange = new ExchangeImpl();
- // get CXF binding
- CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
- CxfBinding binding = endpoint.getCxfBinding();
+ // prepare binding operation info
+ BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange);
+
+ Map<String, Object> invocationContext = new HashMap<String, Object>();
+ Map<String, Object> responseContext = new HashMap<String, Object>();
+ invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
+ invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange));
+
+ // send the CXF request
+ client.invoke(boi, getParams(endpoint, camelExchange),
+ invocationContext, cxfExchange);
+
+ // bind the CXF response to Camel exchange
+ if (!boi.getOperationInfo().isOneWay()) {
+ // copy the InMessage header to OutMessage header
+ camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders());
+ endpoint.getCxfBinding().populateExchangeFromCxfResponse(camelExchange, cxfExchange,
+ responseContext);
+ }
+ }
+
+ protected Map<String, Object> prepareRequest(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) throws Exception {
// create invocation context
WrappedMessageContext requestContext = new WrappedMessageContext(
new HashMap<String, Object>(), null, Scope.APPLICATION);
- Map<String, Object> responseContext = new HashMap<String, Object>();
-
// set data format mode in exchange
DataFormat dataFormat = endpoint.getDataFormat();
@@ -107,13 +161,26 @@ public class CxfProducer extends Default
+ "=" + true);
}
}
+
+ // bind the request CXF exchange
+ endpoint.getCxfBinding().populateCxfRequestFromExchange(cxfExchange, camelExchange,
+ requestContext);
+
+ // Remove protocol headers from scopes. Otherwise, response headers can be
+ // overwritten by request headers when SOAPHandlerInterceptor tries to create
+ // a wrapped message context by the copyScoped() method.
+ requestContext.getScopes().remove(Message.PROTOCOL_HEADERS);
+ return requestContext.getWrappedMap();
+ }
+
+ private BindingOperationInfo prepareBindingOperation(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) {
// get binding operation info
BindingOperationInfo boi = getBindingOperationInfo(camelExchange);
ObjectHelper.notNull(boi, "BindingOperationInfo");
// keep the message wrapper in PAYLOAD mode
- if (dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) {
+ if (endpoint.getDataFormat() == DataFormat.PAYLOAD && boi.isUnwrapped()) {
boi = boi.getWrappedOperation();
cxfExchange.put(BindingOperationInfo.class, boi);
@@ -126,7 +193,7 @@ public class CxfProducer extends Default
}
// Unwrap boi before passing it to make a client call
- if (dataFormat != DataFormat.PAYLOAD && !endpoint.isWrapped() && boi != null) {
+ if (endpoint.getDataFormat() != DataFormat.PAYLOAD && !endpoint.isWrapped() && boi != null) {
if (boi.isUnwrappedCapable()) {
boi = boi.getUnwrappedOperation();
if (LOG.isTraceEnabled()) {
@@ -134,31 +201,7 @@ public class CxfProducer extends Default
}
}
}
-
- // bind the request CXF exchange
- binding.populateCxfRequestFromExchange(cxfExchange, camelExchange,
- requestContext);
-
- // Remove protocol headers from scopes. Otherwise, response headers can be
- // overwritten by request headers when SOAPHandlerInterceptor tries to create
- // a wrapped message context by the copyScoped() method.
- requestContext.getScopes().remove(Message.PROTOCOL_HEADERS);
-
- Map<String, Object> invocationContext = new HashMap<String, Object>();
- invocationContext.put(Client.RESPONSE_CONTEXT, responseContext);
- invocationContext.put(Client.REQUEST_CONTEXT, requestContext.getWrappedMap());
-
- // send the CXF request
- client.invoke(boi, getParams(endpoint, camelExchange),
- invocationContext, cxfExchange);
-
- // bind the CXF response to Camel exchange
- if (!boi.getOperationInfo().isOneWay()) {
- // copy the InMessage header to OutMessage header
- camelExchange.getOut().getHeaders().putAll(camelExchange.getIn().getHeaders());
- binding.populateExchangeFromCxfResponse(camelExchange, cxfExchange,
- responseContext);
- }
+ return boi;
}
private void checkParameterSize(CxfEndpoint endpoint, Exchange exchange, Object[] parameters) {
Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java (original)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRawMessageRouterTest.java Thu Jul 22 14:10:42 2010
@@ -28,6 +28,7 @@ import org.junit.Test;
public class CxfRawMessageRouterTest extends CxfSimpleRouterTest {
private String routerEndpointURI = "cxf://" + ROUTER_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=MESSAGE";
private String serviceEndpointURI = "cxf://" + SERVICE_ADDRESS + "?" + SERVICE_CLASS + "&dataFormat=MESSAGE";
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
@@ -60,6 +61,7 @@ public class CxfRawMessageRouterTest ext
+ "<soap:Body><ns1:echo xmlns:ns1=\"http://cxf.component.camel.apache.org/\">"
+ "<arg0 xmlns=\"http://cxf.component.camel.apache.org/\">hello world</arg0>"
+ "</ns1:echo></soap:Body></soap:Envelope>");
+
}
});
Modified: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java (original)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfTimeoutTest.java Thu Jul 22 14:10:42 2010
@@ -60,11 +60,11 @@ public class CxfTimeoutTest extends Came
protected void sendTimeOutMessage(String endpointUri) throws Exception {
Exchange reply = sendJaxWsMessage(endpointUri);
Exception e = reply.getException();
- assertNotNull("We should get the exception cause here", e.getCause());
- assertTrue("We should get the socket time out exception here", e.getCause().getCause() instanceof SocketTimeoutException);
+ assertNotNull("We should get the exception cause here", e);
+ assertTrue("We should get the socket time out exception here", e instanceof SocketTimeoutException);
}
- protected Exchange sendJaxWsMessage(String endpointUri) {
+ protected Exchange sendJaxWsMessage(String endpointUri) throws InterruptedException {
Exchange exchange = template.send(endpointUri, new Processor() {
public void process(final Exchange exchange) {
final List<String> params = new ArrayList<String>();
Modified: camel/trunk/parent/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/parent/pom.xml?rev=966670&r1=966669&r2=966670&view=diff
==============================================================================
--- camel/trunk/parent/pom.xml (original)
+++ camel/trunk/parent/pom.xml Thu Jul 22 14:10:42 2010
@@ -48,7 +48,7 @@
<commons-collections-version>3.2.1</commons-collections-version>
<commons-pool-version>1.5.4</commons-pool-version>
<commons-dbcp-version>1.3</commons-dbcp-version>
- <cxf-version>2.2.9</cxf-version>
+ <cxf-version>2.2.10-SNAPSHOT</cxf-version>
<derby-version>10.4.2.0</derby-version>
<dozer-version>5.2.2</dozer-version>
<easymock-version>2.5.2</easymock-version>