You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ff...@apache.org on 2010/09/06 14:37:09 UTC

svn commit: r993010 - in /cxf/trunk: rt/core/src/main/java/org/apache/cxf/transport/ systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/

Author: ffang
Date: Mon Sep  6 12:37:08 2010
New Revision: 993010

URL: http://svn.apache.org/viewvc?rev=993010&view=rev
Log:
[CXF-2975]ChainInitiationObserver lack synchronization which could causes continuations to malfunction for some rare scenario

Added:
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/IncomingMessageCounterInterceptor.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderJMSContinuationTest.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderServer.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/GreetMeDocLiteralResp.xml
Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=993010&r1=993009&r2=993010&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Mon Sep  6 12:37:08 2010
@@ -64,9 +64,11 @@ public class ChainInitiationObserver imp
             
             if (m.getInterceptorChain() instanceof PhaseInterceptorChain) {
                 phaseChain = (PhaseInterceptorChain)m.getInterceptorChain();
-                if (phaseChain.getState() == InterceptorChain.State.PAUSED) {
-                    phaseChain.resume();
-                    return;
+                synchronized (phaseChain) {
+                    if (phaseChain.getState() == InterceptorChain.State.PAUSED) {
+                        phaseChain.resume();
+                        return;
+                    }
                 }
             }
             

Added: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java?rev=993010&view=auto
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java (added)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/HWSoapMessageDocProvider.java Mon Sep  6 12:37:08 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import java.io.InputStream;
+
+import javax.annotation.Resource;
+import javax.xml.namespace.QName;
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.SOAPBody;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.ws.Provider;
+import javax.xml.ws.Service;
+import javax.xml.ws.ServiceMode;
+import javax.xml.ws.WebServiceContext;
+import javax.xml.ws.WebServiceProvider;
+import javax.xml.ws.handler.MessageContext;
+import javax.xml.ws.soap.SOAPFaultException;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.helpers.DOMUtils;
+
+
+
+@WebServiceProvider(serviceName = "HelloWorldService", 
+            portName = "HelloWorldPort",
+            targetNamespace = "http://cxf.apache.org/hello_world_jms",
+            wsdlLocation = "/org/apache/cxf/systest/jms/continuations/jms_test.wsdl")
+@ServiceMode(value = Service.Mode.MESSAGE)
+public class HWSoapMessageDocProvider implements Provider<SOAPMessage> {
+
+    private static QName sayHi = new QName("http://apache.org/hello_world_soap_http", "sayHi");
+    private static QName greetMe = new QName("http://apache.org/hello_world_soap_http", "greetMe");
+    
+    @Resource 
+    WebServiceContext ctx;
+    
+    private SOAPMessage sayHiResponse;
+    private SOAPMessage greetMeResponse;
+    
+    public HWSoapMessageDocProvider() {
+       
+        try {
+            MessageFactory factory = MessageFactory.newInstance();            
+            InputStream is = getClass().getResourceAsStream("resources/GreetMeDocLiteralResp.xml");
+            greetMeResponse =  factory.createMessage(null, is);
+            is.close();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+    
+    public SOAPMessage invoke(SOAPMessage request) {
+        try {
+            final MessageContext messageContext = ctx.getMessageContext();
+
+            ContinuationProvider contProvider = 
+                (ContinuationProvider) messageContext.get(ContinuationProvider.class.getName());
+            final Continuation continuation = contProvider.getContinuation();
+            synchronized (continuation) {
+                if (continuation.isNew()) {
+
+                    new Thread(new Runnable() {
+
+                        public void run() {
+                            try {
+                                synchronized (continuation) {
+                                    continuation.resume();
+                                }
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                        }
+
+                    }).start();
+
+                    continuation.suspend(5000);
+                    throw new RuntimeException("The continuation provider doesn't "
+                            + "support asynchronous continuations");
+                    
+                } else if (!continuation.isResumed()) {
+                    throw new RuntimeException("time out");
+                } else {
+                    return resumeMessage(request);
+                }
+            }
+        } catch (SuspendedInvocationException e) {
+            throw e;
+        } catch (SOAPFaultException e) {
+            throw e;
+        } 
+        
+    }
+    
+    public SOAPMessage resumeMessage(SOAPMessage request) {
+        if (IncomingMessageCounterInterceptor.getMessageCount() != 1) {
+            throw new RuntimeException("IncomingMessageCounterInterceptor get invoked twice");
+        }
+        QName qn = (QName)ctx.getMessageContext().get(MessageContext.WSDL_OPERATION);
+        if (qn == null) {
+            throw new RuntimeException("No Operation Name");
+        }
+        
+        SOAPMessage response = null;        
+        try {
+            SOAPBody body = request.getSOAPBody();
+            Node n = body.getFirstChild();
+
+            while (n.getNodeType() != Node.ELEMENT_NODE) {
+                n = n.getNextSibling();
+            }
+            if (n.getLocalName().equals(sayHi.getLocalPart())) {
+                response = sayHiResponse;
+            } else if (n.getLocalName().equals(greetMe.getLocalPart())) {
+                Element el = DOMUtils.getFirstElement(n);
+                String v = DOMUtils.getContent(el);
+                if (v.contains("Return sayHi")) {
+                    response = sayHiResponse;
+                } else {
+                    response = greetMeResponse;
+                }
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+        return response;
+    }
+}

Added: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/IncomingMessageCounterInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/IncomingMessageCounterInterceptor.java?rev=993010&view=auto
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/IncomingMessageCounterInterceptor.java (added)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/IncomingMessageCounterInterceptor.java Mon Sep  6 12:37:08 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import org.apache.cxf.interceptor.AttachmentInInterceptor;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+@SuppressWarnings("unchecked")
+public class IncomingMessageCounterInterceptor extends AbstractPhaseInterceptor {
+    
+    private static int messageCount;
+    
+    public IncomingMessageCounterInterceptor() {
+        super(Phase.RECEIVE);
+        getBefore().add(AttachmentInInterceptor.class.getName());
+    }
+
+    
+    public void handleMessage(Message message) throws Fault {
+        messageCount++;
+    }
+    
+    public static int getMessageCount() {
+        return messageCount;
+    }
+}

Added: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderJMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderJMSContinuationTest.java?rev=993010&view=auto
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderJMSContinuationTest.java (added)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderJMSContinuationTest.java Mon Sep  6 12:37:08 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.hello_world_jms.HelloWorldPortType;
+import org.apache.cxf.hello_world_jms.HelloWorldService;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProviderJMSContinuationTest extends AbstractBusClientServerTestBase {
+    protected static boolean serversStarted;
+    static final String JMS_PORT = EmbeddedJMSBrokerLauncher.PORT;
+    static final String PORT = ProviderServer.PORT;
+
+
+    @Before
+    public void startServers() throws Exception {
+        if (serversStarted) {
+            return;
+        }
+        Map<String, String> props = new HashMap<String, String>();                
+        if (System.getProperty("activemq.store.dir") != null) {
+            props.put("activemq.store.dir", System.getProperty("activemq.store.dir"));
+        }
+        props.put("java.util.logging.config.file", 
+                  System.getProperty("java.util.logging.config.file"));
+        
+        assertTrue("server did not launch correctly", 
+                   launchServer(EmbeddedJMSBrokerLauncher.class, props, null));
+
+        assertTrue("server did not launch correctly", 
+                   launchServer(ProviderServer.class, false));
+        serversStarted = true;
+    }
+    
+    public URL getWSDLURL(String s) throws Exception {
+        return getClass().getResource(s);
+    }
+    public QName getServiceName(QName q) {
+        return q;
+    }
+    public QName getPortName(QName q) {
+        return q;
+    }
+    
+        
+    @Test
+    public void testProviderContinuation() throws Exception {
+        try {
+            QName serviceName = getServiceName(new QName("http://cxf.apache.org/hello_world_jms", 
+                                 "HelloWorldService"));
+            QName portName = getPortName(
+                    new QName("http://cxf.apache.org/hello_world_jms", "HelloWorldPort"));
+            URL wsdl = getWSDLURL("/org/apache/cxf/systest/jms/continuations/jms_test.wsdl");
+            assertNotNull(wsdl);
+            String wsdlString = wsdl.toString();
+            EmbeddedJMSBrokerLauncher.updateWsdlExtensors(getBus(), wsdlString);
+
+            HelloWorldService service = new HelloWorldService(wsdl, serviceName);
+            assertNotNull(service);
+            HelloWorldPortType greeter = service.getPort(portName, HelloWorldPortType.class);
+            greeter.greetMe("ffang");
+        } catch (Exception ex) {
+            fail("shouldn't get exception here, which is caused by " 
+                    + ex.getMessage());
+        }     
+    }
+        
+}
+

Added: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderServer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderServer.java?rev=993010&view=auto
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderServer.java (added)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/ProviderServer.java Mon Sep  6 12:37:08 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.jaxws.EndpointImpl;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher;
+
+public class ProviderServer extends AbstractBusTestServerBase {
+    public static final String PORT = allocatePort(ProviderServer.class);
+
+
+    protected void run()  {
+        EmbeddedJMSBrokerLauncher.updateWsdlExtensors(BusFactory.getDefaultBus(),
+            "/org/apache/cxf/systest/jms/continuations/jms_test.wsdl");
+
+        Object implementor = new HWSoapMessageDocProvider();        
+        String address = "http://localhost:" + PORT + "/SoapContext/SoapPort";
+        Endpoint endpoint = Endpoint.publish(address, implementor);
+        ((EndpointImpl)endpoint).getInInterceptors().add(new IncomingMessageCounterInterceptor());
+    }
+
+
+    public static void main(String[] args) {
+        try {
+            ProviderServer s = new ProviderServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+}

Added: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/GreetMeDocLiteralResp.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/GreetMeDocLiteralResp.xml?rev=993010&view=auto
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/GreetMeDocLiteralResp.xml (added)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/continuations/resources/GreetMeDocLiteralResp.xml Mon Sep  6 12:37:08 2010
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Body><ns1:greetMeResponse xmlns:ns1="http://cxf.apache.org/hello_world_jms"><return xmlns:ns2="http://cxf.apache.org/hello_world_jms/types">Hi Fred Ruby</return></ns1:greetMeResponse></soap:Body></soap:Envelope>