You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2010/03/26 09:38:39 UTC
svn commit: r927731 - in
/servicemix/components/bindings/servicemix-cxf-bc/trunk/src:
main/java/org/apache/servicemix/cxfbc/
test/java/org/apache/servicemix/cxfbc/ws/rm/
test/resources/org/apache/servicemix/cxfbc/ws/rm/
Author: ffang
Date: Fri Mar 26 08:38:39 2010
New Revision: 927731
URL: http://svn.apache.org/viewvc?rev=927731&view=rev
Log:
[SMXCOM-728]some ws-rm related incoming message like createsequence/AcknowladgeSequence didn't go into correct observer
Modified:
servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java
servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java
servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java
servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml
Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java Fri Mar 26 08:38:39 2010
@@ -19,7 +19,9 @@ package org.apache.servicemix.cxfbc;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.HttpURLConnection;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import javax.jbi.messaging.ExchangeStatus;
@@ -30,7 +32,9 @@ import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.bind.JAXBException;
import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.Source;
import org.apache.cxf.Bus;
@@ -51,10 +55,18 @@ import org.apache.cxf.phase.PhaseInterce
import org.apache.cxf.phase.PhaseManager;
import org.apache.cxf.service.model.BindingOperationInfo;
import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.Names;
+import org.apache.cxf.ws.addressing.RelatesToType;
+import org.apache.cxf.ws.addressing.soap.MAPCodec;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.cxfbc.interceptors.JbiInWsdl1Interceptor;
import org.apache.servicemix.cxfbc.interceptors.SchemaValidationInInterceptor;
+import org.xml.sax.SAXException;
+
+import com.sun.xml.bind.v2.runtime.reflect.ListIterator;
public class CxfBcProviderMessageObserver implements MessageObserver {
ByteArrayOutputStream response = new ByteArrayOutputStream();
@@ -65,6 +77,8 @@ public class CxfBcProviderMessageObserve
private CxfBcProvider providerEndpoint;
+
+ private MessageObserver sharedMessageObserver;
public CxfBcProviderMessageObserver(CxfBcProvider providerEndpoint) {
this.providerEndpoint = providerEndpoint;
@@ -85,28 +99,6 @@ public class CxfBcProviderMessageObserve
public void onMessage(Message message) {
try {
- MessageExchange messageExchange = message.getExchange().get(MessageExchange.class);
- if (messageExchange == null) {
- // probably, that's a WS-RM Response; use the messageObserver defined in exchange
- MessageObserver messageObserver = message.getExchange().get(MessageObserver.class);
- if (messageObserver != null) {
- messageObserver.onMessage(message);
- return;
- }
- }
- if (messageExchange != null && messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
- return;
- }
-
-
- contentType = (String) message.get(Message.CONTENT_TYPE);
- SoapMessage soapMessage =
- (SoapMessage) this.providerEndpoint.getCxfEndpoint().getBinding().createMessage(message);
-
-
- soapMessage
- .put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
-
// create Interceptor chain
PhaseManager pm = providerEndpoint.getBus().getExtension(
@@ -116,7 +108,7 @@ public class CxfBcProviderMessageObserve
inList.add(new MustUnderstandInterceptor());
inList.add(new StaxInInterceptor());
inList.add(new JbiInWsdl1Interceptor(this.providerEndpoint.isUseJBIWrapper(),
- this.providerEndpoint.isUseSOAPEnvelope()));
+ this.providerEndpoint.isUseSOAPEnvelope()));
if (this.providerEndpoint.isSchemaValidationEnabled()) {
inList.add(new SchemaValidationInInterceptor(this.providerEndpoint.isUseJBIWrapper(),
this.providerEndpoint.isUseSOAPEnvelope()));
@@ -126,7 +118,47 @@ public class CxfBcProviderMessageObserve
inChain.add(providerEndpoint.getBus().getInInterceptors());
inChain.add(inList);
inChain.add(providerEndpoint.getInInterceptors());
+
+ contentType = (String) message.get(Message.CONTENT_TYPE);
+ SoapMessage soapMessage =
+ (SoapMessage) this.providerEndpoint.getCxfEndpoint().getBinding().createMessage(message);
+
+ soapMessage
+ .put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
soapMessage.setInterceptorChain(inChain);
+ MessageExchange messageExchange = soapMessage.getExchange().get(MessageExchange.class);
+ if (messageExchange == null) {
+ // probably, that's a WS-RM Response; use the messageObserver defined in exchange
+ MessageObserver messageObserver = message.getExchange().get(MessageObserver.class);
+ if (messageObserver != null) {
+ messageObserver.onMessage(message);
+ return;
+ } else {
+ //decoupled endpoint case we need try to restore the exchange first;
+ Exchange exchange = restoreExchange(soapMessage);
+ if (exchange != null) {
+ MessageObserver rmMessageObserver = exchange.get(MessageObserver.class);
+ if (rmMessageObserver != null) {
+ //means it createsequence messagee
+ sharedMessageObserver = rmMessageObserver;
+ rmMessageObserver.onMessage(soapMessage);
+ return;
+ }
+ } else {
+ //means it acknowlagement message
+ if (sharedMessageObserver != null) {
+ sharedMessageObserver.onMessage(soapMessage);
+ return;
+ }
+ }
+ }
+ }
+ if (messageExchange != null && messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
+ return;
+ }
+
+
+
inChain.doIntercept(soapMessage);
closeConnectionStream(soapMessage);
if (soapMessage.getContent(Exception.class) != null || soapMessage.getContent(Source.class) == null) {
@@ -141,6 +173,10 @@ public class CxfBcProviderMessageObserve
}
messageExchange = soapMessage.getExchange().get(MessageExchange.class);
+ if (isPartialResponse(message)) {
+ //partial response for origianl channel when use decoupled endpoint
+ return;
+ }
if (soapMessage.getExchange().get(BindingOperationInfo.class).getOperationInfo().isOneWay()) {
messageExchange.setStatus(ExchangeStatus.DONE);
} else if (soapMessage.get("jbiFault") != null
@@ -198,17 +234,56 @@ public class CxfBcProviderMessageObserve
}
}
+ private Exchange restoreExchange(SoapMessage message) throws IOException, SAXException, JAXBException {
+ InputStream is = message.getContent(InputStream.class);
+ //cache the message
+ CachedOutputStream bos = new CachedOutputStream();
+ IOUtils.copy(is, bos);
+ bos.flush();
+ is.close();
+ message.setContent(InputStream.class, bos.getInputStream());
+ ReadHeadersInterceptor readHeadersInterceptor =
+ new ReadHeadersInterceptor(this.providerEndpoint.getBus());
+ readHeadersInterceptor.handleMessage(message);
+ for (Interceptor<?> interceptor : this.providerEndpoint.getBus().getOutInterceptors()) {
+ if (interceptor.getClass().getName().equals("org.apache.cxf.ws.addressing.soap.MAPCodec")) {
+ MAPCodec mapCodec = (MAPCodec) interceptor;
+ AddressingProperties maps = mapCodec.unmarshalMAPs(message);
+ if (maps != null
+ && maps.getRelatesTo() != null
+ && isRelationshipReply(maps.getRelatesTo())) {
+ Exchange correlatedExchange =
+ mapCodec.getUncorrelatedExchanges().get(maps.getRelatesTo().getValue());
+ message.setContent(InputStream.class, bos.getInputStream());
+ bos.close();
+ XMLStreamReader xmlReader =
+ StaxUtils.createXMLStreamReader(message.getContent(InputStream.class));
+ message.setContent(XMLStreamReader.class, xmlReader);
+ message.setContent(InputStream.class, bos.getInputStream());
+ return correlatedExchange;
+
+ }
+
+
+ }
+ }
+ message.setContent(InputStream.class, bos.getInputStream());
+ bos.close();
+ XMLStreamReader xmlReader =
+ StaxUtils.createXMLStreamReader(message.getContent(InputStream.class));
+ message.setContent(XMLStreamReader.class, xmlReader);
+ message.setContent(InputStream.class, bos.getInputStream());
+ return null;
+ }
+
private void closeConnectionStream(SoapMessage soapMessage) throws IOException {
InputStream is = soapMessage.getContent(InputStream.class);
if (is != null) {
CachedOutputStream bos = new CachedOutputStream();
IOUtils.copy(is, bos);
-
bos.flush();
is.close();
-
soapMessage.setContent(InputStream.class, bos.getInputStream());
-
bos.close();
}
@@ -224,4 +299,16 @@ public class CxfBcProviderMessageObserve
}
}
+ private boolean isRelationshipReply(RelatesToType relatesTo) {
+ return Names.WSA_RELATIONSHIP_REPLY.equals(relatesTo.getRelationshipType());
+ }
+
+ private boolean isPartialResponse(Message in) {
+ if (in.get(Message.RESPONSE_CODE) != null) {
+ return in.get(Message.RESPONSE_CODE).equals(HttpURLConnection.HTTP_ACCEPTED);
+ } else {
+ return false;
+ }
+ }
+
}
Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java Fri Mar 26 08:38:39 2010
@@ -82,7 +82,7 @@ public class CxfBcRMFeatureTest extends
jbi.activateComponent(echo, "echo");
SpringBusFactory bf = new SpringBusFactory();
- Bus clientBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ Bus clientBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml");
BusFactory.setDefaultBus(clientBus);
URL wsdl = getClass().getResource("/HelloWorld-DOC.wsdl");
assertNotNull(wsdl);
Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java Fri Mar 26 08:38:39 2010
@@ -74,7 +74,7 @@ public class CxfBcRMProviderTest extends
//start external service
if (withRM) {
SpringBusFactory bf = new SpringBusFactory();
- Bus serverBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ Bus serverBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml");
BusFactory.setDefaultBus(serverBus);
}
factory = new JaxWsServerFactoryBean();
@@ -98,13 +98,14 @@ public class CxfBcRMProviderTest extends
CxfBcComponent comp = new CxfBcComponent();
CxfBcProvider ep = new CxfBcProvider();
if (withRM) {
- ep.setBusCfg("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+ ep.setBusCfg("org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml");
}
ep.setWsdl(new ClassPathResource("/wsdl/calculator.wsdl"));
ep.setLocationURI(new URI("http://localhost:9001/providertest"));
ep.setEndpoint("CalculatorPort");
ep.setService(new QName("http://apache.org/cxf/calculator", "CalculatorService"));
ep.setInterfaceName(new QName("http://apache.org/cxf/calculator", "CalculatorPortType"));
+ ep.setUseJBIWrapper(false);
comp.setEndpoints(new CxfBcEndpointType[] {ep});
jbi.activateComponent(comp, "servicemix-cxfbc");
client = new DefaultServiceMixClient(jbi);
@@ -114,25 +115,21 @@ public class CxfBcRMProviderTest extends
io.setOperation(new QName("http://apache.org/cxf/calculator", "add"));
//send message
io.getInMessage().setContent(new StringSource(
- "<message xmlns='http://java.sun.com/xml/ns/jbi/wsdl-11-wrapper'>"
- + "<part>"
- + "<add xmlns='http://apache.org/cxf/calculator/types'><arg0>10</arg0>"
- + "<arg1>5</arg1></add>"
- + "</part>"
- + "</message>"));
+ "<soap:Envelope xmlns:soap=\"http://www.w3.org/2003/05/soap-envelope\">"
+ +"<soap:Body><add xmlns=\"http://apache.org/cxf/calculator/types\"><arg0>10</arg0><arg1>5</arg1></add></soap:Body></soap:Envelope>"));
client.sendSync(io);
client.done(io);
assertTrue(new SourceTransformer().contentToString(
io.getOutMessage()).indexOf("<return>15</return>") >= 0);
-
+ Thread.sleep(30000);
// Shutdown CXF Service/Endpoint so that next test doesn't fail.
factory.getBus().shutdown(true);
// Shutdown jbi
jbi.shutDown();
}
- public void testProvider() throws Exception {
+ public void testDecoupledProvider() throws Exception {
localTestProvider(true);
}
-
+
}
Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml Fri Mar 26 08:38:39 2010
@@ -48,6 +48,9 @@
</wsrm-mgr:reliableMessaging>
</cxf:features>
</cxf:bus>
+ <http:conduit name="{http://apache.org/cxf/calculator}CalculatorPort.http-conduit">
+ <http:client DecoupledEndpoint="http://localhost:9990/decoupled_endpoint"/>
+ </http:conduit>
</beans>