You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cxf.apache.org by Andrea Smyth <an...@iona.com> on 2006/12/06 12:56:20 UTC

Re: svn commit: r483036 - in /incubator/cxf/trunk: rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ systests/src/test/java/org/apache/cxf/systest/ws/rm/

The workaround consists in pausing and then resuming the interceptor 
chain on a different thread (finally a use case for pause/resume!). This 
is done in the logical addressing handler after it has sent the partial 
response.
Currently the change takes effect only in the supplied system test as 
this test sets the org.apache.cxf.async.oneway.dispatch property on the 
inbound message (the test would fail otherwise).
Can someone more familiar with addressing check this approach and clean 
it up - I won't be working on CXF for the next couple of weeks.
Roughly, what I think needs to be done is
* have the HTTP destination (probably in 
JettyHTTPDestination.serviceRequest) set this property instead
* use the endpoint's executor instead of creating a new thread in 
ContextUtils.rebaseResponse
If the approach is valid, it should be possible to enable the last test 
(testTwowayNonAnonymousNoOffer) in the RM SequenceTest system test and 
fix JIRA CXF-277. Also, at this point the previous test 
(testServerSideSequenceCreation) can be removed and with it method 
testServerSideSequenceCreation in org.apache.cxf.ws.rm.RMManager.
The DecoupledClientServerTest can then also be removed.

Andrea.




andreasmyth@apache.org wrote:

>Author: andreasmyth
>Date: Wed Dec  6 03:13:57 2006
>New Revision: 483036
>
>URL: http://svn.apache.org/viewvc?view=rev&rev=483036
>Log:
>Workaround for problem in Jetty which does not service further requests after a response had been commited and the request marked as handled on one of its threads until this thread has terminated.
>
>Added:
>    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java   (with props)
>    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml   (with props)
>Modified:
>    incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
>    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/GreeterImpl.java
>
>Modified: incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
>URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?view=diff&rev=483036&r1=483035&r2=483036
>==============================================================================
>--- incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original)
>+++ incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Wed Dec  6 03:13:57 2006
>@@ -360,6 +360,28 @@
>                     if (fullResponse != null) {
>                         exchange.setOutMessage(fullResponse);
>                     }
>+
>+                    Object obj = inMessage.get("org.apache.cxf.async.oneway.dispatch");
>+                    if (obj != null && Boolean.TRUE.equals(obj)) {
>+                        
>+                        // pause and resume execution of chain on separate thread
>+                        inMessage.getInterceptorChain().pause();
>+
>+                        LOG.info("Resuming execution of interceptor chain on separate thread");
>+                        
>+                        final class FullResponseThread extends Thread {
>+                            Message msg;
>+                            FullResponseThread(Message m) {
>+                                msg = m;
>+                            }
>+                            public void run() {
>+                                msg.getInterceptorChain().resume();
>+                            }
>+                        }
>+    
>+                        Thread t = new FullResponseThread(inMessage);
>+                        t.start();
>+                    }
>                 }
>             } catch (Exception e) {
>                 LOG.log(Level.WARNING, "SERVER_TRANSPORT_REBASE_FAILURE_MSG", e);
>
>Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java
>URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java?view=auto&rev=483036
>==============================================================================
>--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java (added)
>+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java Wed Dec  6 03:13:57 2006
>@@ -0,0 +1,182 @@
>+/**
>+ * Licensed to the Apache Software Foundation (ASF) under one
>+ * or more contributor license agreements. See the NOTICE file
>+ * distributed with this work for additional information
>+ * regarding copyright ownership. The ASF licenses this file
>+ * to you under the Apache License, Version 2.0 (the
>+ * "License"); you may not use this file except in compliance
>+ * with the License. You may obtain a copy of the License at
>+ *
>+ * http://www.apache.org/licenses/LICENSE-2.0
>+ *
>+ * Unless required by applicable law or agreed to in writing,
>+ * software distributed under the License is distributed on an
>+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>+ * KIND, either express or implied. See the License for the
>+ * specific language governing permissions and limitations
>+ * under the License.
>+ */
>+
>+package org.apache.cxf.systest.ws.rm;
>+
>+import java.util.logging.Logger;
>+
>+import javax.xml.ws.Endpoint;
>+
>+import junit.framework.Test;
>+import junit.framework.TestSuite;
>+
>+import org.apache.cxf.Bus;
>+import org.apache.cxf.bus.spring.SpringBusFactory;
>+import org.apache.cxf.greeter_control.Greeter;
>+import org.apache.cxf.greeter_control.GreeterService;
>+import org.apache.cxf.interceptor.Fault;
>+import org.apache.cxf.interceptor.LoggingInInterceptor;
>+import org.apache.cxf.interceptor.LoggingOutInterceptor;
>+import org.apache.cxf.message.Message;
>+import org.apache.cxf.phase.AbstractPhaseInterceptor;
>+import org.apache.cxf.phase.Phase;
>+import org.apache.cxf.systest.common.ClientServerSetupBase;
>+import org.apache.cxf.systest.common.ClientServerTestBase;
>+import org.apache.cxf.systest.common.TestServerBase;
>+import org.apache.cxf.ws.addressing.MAPAggregator;
>+
>+
>+/**
>+ * Tests the addition of WS-RM properties to application messages and the
>+ * exchange of WS-RM protocol messages.
>+ */
>+public class DecoupledClientServerTest extends ClientServerTestBase {
>+
>+    private static final Logger LOG = Logger.getLogger(DecoupledClientServerTest.class.getName());
>+    private Bus bus;
>+
>+    public static class Server extends TestServerBase {
>+        
>+        protected void run()  {            
>+            SpringBusFactory bf = new SpringBusFactory();
>+            Bus bus = bf.createBus("/org/apache/cxf/systest/ws/rm/decoupled.xml");
>+            bf.setDefaultBus(bus);
>+            LoggingInInterceptor in = new LoggingInInterceptor();
>+            bus.getInInterceptors().add(in);
>+            bus.getInFaultInterceptors().add(in);
>+            LoggingOutInterceptor out = new LoggingOutInterceptor();
>+            bus.getOutInterceptors().add(out);
>+            bus.getOutFaultInterceptors().add(out);
>+            
>+            class RebasedResponseThreadInterceptor extends AbstractPhaseInterceptor<Message> {
>+                RebasedResponseThreadInterceptor() {
>+                    addBefore(MAPAggregator.class.getName());
>+                    setPhase(Phase.PRE_LOGICAL);
>+                }
>+
>+                public void handleMessage(Message message) throws Fault {
>+                    message.put("org.apache.cxf.async.oneway.dispatch", Boolean.TRUE);
>+                }
>+                
>+            }
>+            
>+            bus.getInInterceptors().add(new RebasedResponseThreadInterceptor());
>+            
>+            GreeterImpl implementor = new GreeterImpl();
>+            implementor.setDelay(8000);
>+            String address = "http://localhost:9020/SoapContext/GreeterPort";
>+            Endpoint.publish(address, implementor);
>+            LOG.info("Published greeter endpoint.");
>+        }
>+        
>+
>+        public static void main(String[] args) {
>+            try { 
>+                Server s = new Server(); 
>+                s.start();
>+            } catch (Exception ex) {
>+                ex.printStackTrace();
>+                System.exit(-1);
>+            } finally { 
>+                System.out.println("done!");
>+            }
>+        }
>+    }    
>+    
>+    public static Test suite() throws Exception {
>+        TestSuite suite = new TestSuite(DecoupledClientServerTest.class);
>+        return new ClientServerSetupBase(suite) {
>+            public void startServers() throws Exception {
>+                assertTrue("server did not launch correctly", launchServer(Server.class));
>+            }
>+            
>+            public void setUp() throws Exception {
>+                startServers();
>+                LOG.fine("Started server.");  
>+            }
>+        };
>+    }
>+    
>+    public void tearDown() {
>+        bus.shutdown(true);
>+        System.getProperties().remove("jetty.workaround");
>+    }
>+    
>+    public void testDecoupled() throws Exception {
>+        SpringBusFactory bf = new SpringBusFactory();
>+        bus = bf.createBus("/org/apache/cxf/systest/ws/rm/decoupled.xml");
>+        bf.setDefaultBus(bus);
>+        LoggingInInterceptor in = new LoggingInInterceptor();
>+        bus.getInInterceptors().add(in);
>+        bus.getInFaultInterceptors().add(in);
>+        LoggingOutInterceptor out = new LoggingOutInterceptor();
>+        bus.getOutInterceptors().add(out);
>+        bus.getOutFaultInterceptors().add(out);
>+        
>+        class RebasedResponseThreadInterceptor extends AbstractPhaseInterceptor<Message> {
>+            RebasedResponseThreadInterceptor() {
>+                super.addAfter(MAPAggregator.class.getName());
>+            }
>+
>+            public void handleMessage(Message message) throws Fault {
>+                message.put("org.apache.cxf.async.oneway.dispatch", Boolean.TRUE);
>+            }
>+            
>+        }
>+   
>+        System.setProperty("jetty.workaround", "true");
>+        
>+        GreeterService gs = new GreeterService();
>+        final Greeter greeter = gs.getGreeterPort();
>+        LOG.fine("Created greeter client.");
>+        
>+        class TwowayThread extends Thread {
>+
>+            String response;
>+            
>+            @Override
>+            public void run() {
>+                response = greeter.greetMe("twoway");
>+            }
>+   
>+        }
>+        
>+        TwowayThread t = new TwowayThread();    
>+        t.start();
>+        
>+        // allow for partial response to twoway request to arrive
>+        
>+        long wait = 3000;
>+        while (wait > 0) {
>+            long start = System.currentTimeMillis();
>+            try {
>+                Thread.sleep(wait);
>+            } catch (InterruptedException ex) {
>+                // ignore
>+            }
>+            wait -= System.currentTimeMillis() - start;
>+        }
>+
>+        greeter.greetMeOneWay("oneway");
>+        
>+        t.join();
>+        assertEquals("Unexpected response to twoway request", "oneway", t.response);
>+        
>+    }
>+}
>
>Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java
>------------------------------------------------------------------------------
>    svn:eol-style = native
>
>Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/DecoupledClientServerTest.java
>------------------------------------------------------------------------------
>    svn:keywords = Rev Date
>
>Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/GreeterImpl.java
>URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/GreeterImpl.java?view=diff&rev=483036&r1=483035&r2=483036
>==============================================================================
>--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/GreeterImpl.java (original)
>+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/GreeterImpl.java Wed Dec  6 03:13:57 2006
>@@ -44,6 +44,7 @@
> 
>     private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName());
>     private long delay;
>+    private String lastOnewayArg;
>      
>     public long getDelay() {
>         return delay;
>@@ -54,8 +55,7 @@
>     }
> 
>     public String greetMe(String arg0) {
>-        LOG.fine("Executing operation greetMe with parameter: " + arg0);
>-        String result = arg0.toUpperCase();
>+        LOG.fine("Executing operation greetMe with parameter: " + arg0);        
>         if (delay > 0) {
>             try {
>                 Thread.sleep(delay);
>@@ -63,6 +63,10 @@
>                 // ignore
>             }
>         }
>+        String result = null;
>+        synchronized (this) {
>+            result = null == lastOnewayArg ? arg0.toUpperCase() : lastOnewayArg;
>+        }
>         LOG.fine("returning: " + result);
>         return result;
>     }
>@@ -78,6 +82,9 @@
>     }
> 
>     public void greetMeOneWay(String arg0) {
>+        synchronized (this) {
>+            lastOnewayArg = arg0;
>+        }
>         LOG.fine("Executing operation greetMeOneWay with parameter: " + arg0);
>     }
> 
>
>Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml
>URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml?view=auto&rev=483036
>==============================================================================
>--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml (added)
>+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml Wed Dec  6 03:13:57 2006
>@@ -0,0 +1,68 @@
>+<?xml version="1.0" encoding="UTF-8"?>
>+<!--
>+  Licensed to the Apache Software Foundation (ASF) under one
>+  or more contributor license agreements. See the NOTICE file
>+  distributed with this work for additional information
>+  regarding copyright ownership. The ASF licenses this file
>+  to you under the Apache License, Version 2.0 (the
>+  "License"); you may not use this file except in compliance
>+  with the License. You may obtain a copy of the License at
>+ 
>+  http://www.apache.org/licenses/LICENSE-2.0
>+ 
>+  Unless required by applicable law or agreed to in writing,
>+  software distributed under the License is distributed on an
>+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>+  KIND, either express or implied. See the License for the
>+  specific language governing permissions and limitations
>+  under the License.
>+-->
>+<beans xmlns="http://www.springframework.org/schema/beans"
>+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>+       xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
>+       xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
>+       xmlns:http-conf="http://cxf.apache.org/transports/http/configuration"
>+       xsi:schemaLocation="
>+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
>+    
>+    <bean name="{http://cxf.apache.org/greeter_control}GreeterPort.http-conduit" abstract="true">
>+        <property name="client">
>+            <value>
>+                <http-conf:client DecoupledEndpoint="http://localhost:9995/decoupled_endpoint"/>
>+            </value>
>+        </property>
>+    </bean>
>+    
>+    <bean id="mapAggregator" class="org.apache.cxf.ws.addressing.MAPAggregator"/>
>+    <bean id="mapCodec" class="org.apache.cxf.ws.addressing.soap.MAPCodec"/>
>+
>+    <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
>+
>+    <bean id="cxf" class="org.apache.cxf.bus.spring.SpringBusImpl">
>+        <property name="inInterceptors">
>+            <list>
>+                <ref bean="mapAggregator"/>
>+                <ref bean="mapCodec"/>
>+            </list>
>+        </property>
>+        <property name="inFaultInterceptors">
>+            <list>
>+                <ref bean="mapAggregator"/>
>+                <ref bean="mapCodec"/>
>+            </list>
>+        </property>
>+        <property name="outInterceptors">
>+            <list>
>+                <ref bean="mapAggregator"/>
>+                <ref bean="mapCodec"/>
>+            </list>
>+        </property>
>+        <property name="outFaultInterceptors">
>+            <list>
>+                <ref bean="mapAggregator"/>
>+                <ref bean="mapCodec"/>
>+            </list>
>+        </property>
>+    </bean>
>+
>+</beans>
>\ No newline at end of file
>
>Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml
>------------------------------------------------------------------------------
>    svn:eol-style = native
>
>Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml
>------------------------------------------------------------------------------
>    svn:keywords = Rev Date
>
>Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/decoupled.xml
>------------------------------------------------------------------------------
>    svn:mime-type = text/xml
>
>
>  
>