You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2010/03/26 09:38:39 UTC

svn commit: r927731 - in /servicemix/components/bindings/servicemix-cxf-bc/trunk/src: main/java/org/apache/servicemix/cxfbc/ test/java/org/apache/servicemix/cxfbc/ws/rm/ test/resources/org/apache/servicemix/cxfbc/ws/rm/

Author: ffang
Date: Fri Mar 26 08:38:39 2010
New Revision: 927731

URL: http://svn.apache.org/viewvc?rev=927731&view=rev
Log:
[SMXCOM-728]some ws-rm related incoming message like createsequence/AcknowladgeSequence didn't go into correct observer

Modified:
    servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java
    servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java
    servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java
    servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml

Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/main/java/org/apache/servicemix/cxfbc/CxfBcProviderMessageObserver.java Fri Mar 26 08:38:39 2010
@@ -19,7 +19,9 @@ package org.apache.servicemix.cxfbc;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.jbi.messaging.ExchangeStatus;
@@ -30,7 +32,9 @@ import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.bind.JAXBException;
 import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamReader;
 import javax.xml.transform.Source;
 
 import org.apache.cxf.Bus;
@@ -51,10 +55,18 @@ import org.apache.cxf.phase.PhaseInterce
 import org.apache.cxf.phase.PhaseManager;
 import org.apache.cxf.service.model.BindingOperationInfo;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.staxutils.StaxUtils;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.Names;
+import org.apache.cxf.ws.addressing.RelatesToType;
+import org.apache.cxf.ws.addressing.soap.MAPCodec;
 import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.cxfbc.interceptors.JbiInWsdl1Interceptor;
 import org.apache.servicemix.cxfbc.interceptors.SchemaValidationInInterceptor;
+import org.xml.sax.SAXException;
+
+import com.sun.xml.bind.v2.runtime.reflect.ListIterator;
 
 public class CxfBcProviderMessageObserver implements MessageObserver {
     ByteArrayOutputStream response = new ByteArrayOutputStream();
@@ -65,6 +77,8 @@ public class CxfBcProviderMessageObserve
 
 
     private CxfBcProvider providerEndpoint;
+    
+    private MessageObserver sharedMessageObserver;
 
     public CxfBcProviderMessageObserver(CxfBcProvider providerEndpoint) {
         this.providerEndpoint = providerEndpoint;
@@ -85,28 +99,6 @@ public class CxfBcProviderMessageObserve
 
     public void onMessage(Message message) {
         try {
-            MessageExchange messageExchange = message.getExchange().get(MessageExchange.class);
-            if (messageExchange == null) {
-                // probably, that's a WS-RM Response; use the messageObserver defined in exchange
-                MessageObserver messageObserver = message.getExchange().get(MessageObserver.class);
-                if (messageObserver != null) {
-                    messageObserver.onMessage(message);
-                    return;
-                }
-            }
-            if (messageExchange != null && messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
-                return;
-            }
-
-                       
-            contentType = (String) message.get(Message.CONTENT_TYPE);
-            SoapMessage soapMessage = 
-                (SoapMessage) this.providerEndpoint.getCxfEndpoint().getBinding().createMessage(message);
-            
-            
-            soapMessage
-                    .put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
-            
             // create Interceptor chain
 
             PhaseManager pm = providerEndpoint.getBus().getExtension(
@@ -116,7 +108,7 @@ public class CxfBcProviderMessageObserve
             inList.add(new MustUnderstandInterceptor());
             inList.add(new StaxInInterceptor());
             inList.add(new JbiInWsdl1Interceptor(this.providerEndpoint.isUseJBIWrapper(),
-            		this.providerEndpoint.isUseSOAPEnvelope()));
+                    this.providerEndpoint.isUseSOAPEnvelope()));
             if (this.providerEndpoint.isSchemaValidationEnabled()) {
                 inList.add(new SchemaValidationInInterceptor(this.providerEndpoint.isUseJBIWrapper(),
                         this.providerEndpoint.isUseSOAPEnvelope()));
@@ -126,7 +118,47 @@ public class CxfBcProviderMessageObserve
             inChain.add(providerEndpoint.getBus().getInInterceptors());
             inChain.add(inList);
             inChain.add(providerEndpoint.getInInterceptors());
+                   
+            contentType = (String) message.get(Message.CONTENT_TYPE);
+            SoapMessage soapMessage = 
+                (SoapMessage) this.providerEndpoint.getCxfEndpoint().getBinding().createMessage(message);
+                    
+            soapMessage
+                    .put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
             soapMessage.setInterceptorChain(inChain);
+            MessageExchange messageExchange = soapMessage.getExchange().get(MessageExchange.class);
+            if (messageExchange == null) {
+                // probably, that's a WS-RM Response; use the messageObserver defined in exchange
+                MessageObserver messageObserver = message.getExchange().get(MessageObserver.class);
+                if (messageObserver != null) {
+                    messageObserver.onMessage(message);
+                    return;
+                } else {
+                    //decoupled endpoint case we need try to restore the exchange first;
+                    Exchange exchange = restoreExchange(soapMessage);
+                    if (exchange != null) {
+                        MessageObserver rmMessageObserver = exchange.get(MessageObserver.class);
+                        if (rmMessageObserver != null) {
+                            //means it createsequence messagee
+                            sharedMessageObserver = rmMessageObserver;
+                            rmMessageObserver.onMessage(soapMessage);
+                            return;
+                        }
+                    } else {
+                        //means it acknowlagement message
+                        if (sharedMessageObserver != null) {
+                            sharedMessageObserver.onMessage(soapMessage);
+                            return;
+                        }
+                    }
+                }
+            }
+            if (messageExchange != null && messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
+                return;
+            }
+                      
+            
+            
             inChain.doIntercept(soapMessage);
             closeConnectionStream(soapMessage);
             if (soapMessage.getContent(Exception.class) != null || soapMessage.getContent(Source.class) == null) {    
@@ -141,6 +173,10 @@ public class CxfBcProviderMessageObserve
             }
           
             messageExchange = soapMessage.getExchange().get(MessageExchange.class);
+            if (isPartialResponse(message)) {
+                //partial response for origianl channel when use decoupled endpoint
+                return;
+            }
             if (soapMessage.getExchange().get(BindingOperationInfo.class).getOperationInfo().isOneWay()) {
                 messageExchange.setStatus(ExchangeStatus.DONE);
             } else if (soapMessage.get("jbiFault") != null
@@ -198,17 +234,56 @@ public class CxfBcProviderMessageObserve
         }
     }
 
+    private Exchange restoreExchange(SoapMessage message) throws IOException, SAXException, JAXBException {
+        InputStream is = message.getContent(InputStream.class);
+        //cache the message
+        CachedOutputStream bos = new CachedOutputStream();
+        IOUtils.copy(is, bos);
+        bos.flush();
+        is.close();
+        message.setContent(InputStream.class, bos.getInputStream());
+        ReadHeadersInterceptor readHeadersInterceptor = 
+                new ReadHeadersInterceptor(this.providerEndpoint.getBus());
+        readHeadersInterceptor.handleMessage(message);
+        for (Interceptor<?> interceptor : this.providerEndpoint.getBus().getOutInterceptors()) {
+            if (interceptor.getClass().getName().equals("org.apache.cxf.ws.addressing.soap.MAPCodec")) {
+                MAPCodec mapCodec = (MAPCodec) interceptor;
+                AddressingProperties maps = mapCodec.unmarshalMAPs(message);
+                if (maps != null
+                    && maps.getRelatesTo() != null
+                    && isRelationshipReply(maps.getRelatesTo())) { 
+                    Exchange correlatedExchange =
+                            mapCodec.getUncorrelatedExchanges().get(maps.getRelatesTo().getValue());
+                    message.setContent(InputStream.class, bos.getInputStream());
+                    bos.close();
+                    XMLStreamReader xmlReader = 
+                        StaxUtils.createXMLStreamReader(message.getContent(InputStream.class));
+                    message.setContent(XMLStreamReader.class, xmlReader);       
+                    message.setContent(InputStream.class, bos.getInputStream());
+                    return correlatedExchange;
+                    
+                }
+                
+                
+            }
+        }
+        message.setContent(InputStream.class, bos.getInputStream());
+        bos.close();
+        XMLStreamReader xmlReader = 
+            StaxUtils.createXMLStreamReader(message.getContent(InputStream.class));
+        message.setContent(XMLStreamReader.class, xmlReader);       
+        message.setContent(InputStream.class, bos.getInputStream());
+        return null;
+    }
+
     private void closeConnectionStream(SoapMessage soapMessage) throws IOException {
         InputStream is = soapMessage.getContent(InputStream.class);
         if (is != null) {
             CachedOutputStream bos = new CachedOutputStream();
             IOUtils.copy(is, bos);
-
             bos.flush();
             is.close();
-
             soapMessage.setContent(InputStream.class, bos.getInputStream());
-
             bos.close();
         }
 
@@ -224,4 +299,16 @@ public class CxfBcProviderMessageObserve
         }
     }
 
+    private boolean isRelationshipReply(RelatesToType relatesTo) {
+        return Names.WSA_RELATIONSHIP_REPLY.equals(relatesTo.getRelationshipType());
+    }
+    
+    private boolean isPartialResponse(Message in) {
+        if (in.get(Message.RESPONSE_CODE) != null) {
+            return in.get(Message.RESPONSE_CODE).equals(HttpURLConnection.HTTP_ACCEPTED);
+        } else {
+            return false;
+        }
+    }
+
 }

Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMFeatureTest.java Fri Mar 26 08:38:39 2010
@@ -82,7 +82,7 @@ public class CxfBcRMFeatureTest extends 
         jbi.activateComponent(echo, "echo");
 
         SpringBusFactory bf = new SpringBusFactory();
-        Bus clientBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+        Bus clientBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml");
         BusFactory.setDefaultBus(clientBus);
         URL wsdl = getClass().getResource("/HelloWorld-DOC.wsdl");
         assertNotNull(wsdl);

Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/java/org/apache/servicemix/cxfbc/ws/rm/CxfBcRMProviderTest.java Fri Mar 26 08:38:39 2010
@@ -74,7 +74,7 @@ public class CxfBcRMProviderTest extends
         //start external service
         if (withRM) {
             SpringBusFactory bf = new SpringBusFactory();
-            Bus serverBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+            Bus serverBus = bf.createBus("org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml");
             BusFactory.setDefaultBus(serverBus);
         }
         factory = new JaxWsServerFactoryBean();
@@ -98,13 +98,14 @@ public class CxfBcRMProviderTest extends
         CxfBcComponent comp = new CxfBcComponent();
         CxfBcProvider ep = new CxfBcProvider();
         if (withRM) {
-            ep.setBusCfg("org/apache/servicemix/cxfbc/ws/rm/rminterceptors.xml");
+            ep.setBusCfg("org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml");
         }
         ep.setWsdl(new ClassPathResource("/wsdl/calculator.wsdl"));
         ep.setLocationURI(new URI("http://localhost:9001/providertest"));
         ep.setEndpoint("CalculatorPort");
         ep.setService(new QName("http://apache.org/cxf/calculator", "CalculatorService"));
         ep.setInterfaceName(new QName("http://apache.org/cxf/calculator", "CalculatorPortType"));
+        ep.setUseJBIWrapper(false);
         comp.setEndpoints(new CxfBcEndpointType[] {ep});
         jbi.activateComponent(comp, "servicemix-cxfbc");
         client = new DefaultServiceMixClient(jbi);
@@ -114,25 +115,21 @@ public class CxfBcRMProviderTest extends
         io.setOperation(new QName("http://apache.org/cxf/calculator", "add"));
         //send message
         io.getInMessage().setContent(new StringSource(
-                "<message xmlns='http://java.sun.com/xml/ns/jbi/wsdl-11-wrapper'>"
-              + "<part>"
-              + "<add xmlns='http://apache.org/cxf/calculator/types'><arg0>10</arg0>"
-              + "<arg1>5</arg1></add>"
-              + "</part>"
-              + "</message>"));
+                "<soap:Envelope xmlns:soap=\"http://www.w3.org/2003/05/soap-envelope\">"
+                +"<soap:Body><add xmlns=\"http://apache.org/cxf/calculator/types\"><arg0>10</arg0><arg1>5</arg1></add></soap:Body></soap:Envelope>"));
         client.sendSync(io);
         client.done(io);
         assertTrue(new SourceTransformer().contentToString(
                 io.getOutMessage()).indexOf("<return>15</return>") >= 0);
-
+        Thread.sleep(30000);
         // Shutdown CXF Service/Endpoint so that next test doesn't fail.
         factory.getBus().shutdown(true);
         // Shutdown jbi
         jbi.shutDown();
     }    
 
-    public void testProvider() throws Exception {
+    public void testDecoupledProvider() throws Exception {
         localTestProvider(true);
     }
-
+    
 }

Modified: servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml?rev=927731&r1=927730&r2=927731&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml (original)
+++ servicemix/components/bindings/servicemix-cxf-bc/trunk/src/test/resources/org/apache/servicemix/cxfbc/ws/rm/rmfeature.xml Fri Mar 26 08:38:39 2010
@@ -48,6 +48,9 @@
       </wsrm-mgr:reliableMessaging>
     </cxf:features>
   </cxf:bus>
+  <http:conduit name="{http://apache.org/cxf/calculator}CalculatorPort.http-conduit">
+    <http:client DecoupledEndpoint="http://localhost:9990/decoupled_endpoint"/>
+  </http:conduit>
 </beans>