You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2008/11/18 13:27:36 UTC

svn commit: r718565 [1/3] - in /cxf/trunk: api/src/main/java/org/apache/cxf/continuations/ api/src/main/java/org/apache/cxf/phase/ api/src/test/java/org/apache/cxf/continuations/ api/src/test/java/org/apache/cxf/phase/ rt/core/src/main/java/org/apache/...

Author: sergeyb
Date: Tue Nov 18 04:27:34 2008
New Revision: 718565

URL: http://svn.apache.org/viewvc?rev=718565&view=rev
Log:
CXF-1835,CXF-1912: continuations support for HTTP and JMS transports

Added:
    cxf/trunk/api/src/main/java/org/apache/cxf/continuations/
    cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java   (with props)
    cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java   (with props)
    cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java   (with props)
    cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java   (with props)
    cxf/trunk/api/src/test/java/org/apache/cxf/continuations/
    cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java   (with props)
    cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java   (with props)
    cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java   (with props)
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java   (with props)
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java   (with props)
    cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/
    cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java   (with props)
    cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java   (with props)
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java   (with props)
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapperTest.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ControlWorker.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloContinuation.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloContinuationService.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloWorker.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/cxf.xml   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/jaxws-server.xml   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/jetty-engine.xml   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/test.wsdl   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/GreeterImplWithContinuationsJMS.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/JMSContinuationsClientServerTest.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server2.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test.wsdl   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test.wsdl   (with props)
Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
    cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java
    cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd
    cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java

Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+import org.apache.cxf.message.Message;
+
+/**
+ * Utility class which can be used to associate an inbound message, as well as
+ * a user object, if any, with continuation
+ */
+public class ContinuationInfo {
+    private Message currentMessage;
+    private Object userObject;
+    
+    public ContinuationInfo(Message m) {
+        currentMessage = m;
+    }
+    
+    public Message getMessage() {
+        return currentMessage;
+    }
+    
+    public void setUserObject(Object object) {
+        userObject = object;
+    }
+    
+    public Object getUserObject() {
+        return userObject;
+    }
+}

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+/**
+ * Provides transport-neutral support for creating suspended invocation primitives
+ * or continuations   
+ */
+public interface ContinuationProvider {
+    
+    /**
+     * Creates a new continuation or retrieves the existing one
+     * @return transport-neutral ContinuationWrapper
+     */
+    ContinuationWrapper getContinuation();
+}

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+/**
+ * Represents transport-neutral suspended invocation instances 
+ * or continuations
+ */
+public interface ContinuationWrapper {
+    
+    /** 
+     * This method will suspend the request for the timeout or until resume is
+     * called
+     * 
+     * @param timeout. A timeout of < 0 will cause an immediate return.
+     * A timeout of 0 will wait indefinitely.
+     * @return True if resume called or false if timeout.
+     */
+    boolean suspend(long timeout);
+    
+    /** 
+     * Resume a suspended request  
+     */
+    void resume();
+    
+    /** 
+     * Reset the continuation
+     */
+    void reset();
+    
+    /** 
+     * Is this a newly created Continuation.
+     * @return True if the continuation has just been created and has not yet suspended the request.
+     */
+    boolean isNew();
+    
+    /** 
+     * Get the pending status
+     * @return True if the continuation has been suspended.
+     */
+    boolean isPending();
+    
+    /** 
+     * Get the resumed status
+     * @return True if the continuation is has been resumed.
+     */
+    boolean isResumed();
+    
+    /** 
+     * Get arbitrary object associated with the continuation for context
+     * 
+     * @return An arbitrary object associated with the continuation
+     */
+    Object getObject();
+    
+    /** 
+     * Sets arbitrary object associated with the continuation for context
+     * 
+     * @param o An arbitrary object to associate with the continuation
+     */
+    void setObject(Object o);
+}

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+/**
+ * Represents transport-specific exceptions which are used to indicate that
+ * a given invocation was suspended
+ */
+public class SuspendedInvocationException extends RuntimeException {
+    
+    public SuspendedInvocationException(Throwable cause) {
+        super(cause);
+    }
+    
+    public SuspendedInvocationException() {
+    }
+    
+   
+    /**
+     * Returns a transport-specific runtime exception
+     * @return RuntimeException the transport-specific runtime exception, 
+     *         can be null for asynchronous transports
+     */
+    public RuntimeException getRuntimeException() {
+        Throwable ex = getCause();
+        return ex instanceof RuntimeException ? (RuntimeException)ex : null;
+    }
+}

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Tue Nov 18 04:27:34 2008
@@ -31,6 +31,7 @@
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.interceptor.InterceptorChain;
@@ -138,6 +139,11 @@
         }
     }
     
+    // this method should really be on the InterceptorChain interface
+    public State getState() {
+        return state;
+    }
+    
     public PhaseInterceptorChain cloneChain() {
         return new PhaseInterceptorChain(this);
     }
@@ -218,6 +224,13 @@
                 }
                 //System.out.println("-----------" + currentInterceptor);
                 currentInterceptor.handleMessage(message);
+            } catch (SuspendedInvocationException ex) {
+                // we need to resume from the same interceptor the exception got originated from
+                if (iterator.hasPrevious()) {
+                    iterator.previous();
+                }
+                pause();
+                throw ex;
             } catch (RuntimeException ex) {
                 if (!faultOccurred) {
  

Added: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java (added)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ContinuationInfoTest extends Assert {
+
+    @Test
+    public void testContinuationInfo() {
+        Message m = new MessageImpl();
+        ContinuationInfo ci = new ContinuationInfo(m);
+        Object userObject = new Object();
+        ci.setUserObject(userObject);
+        assertSame(m, ci.getMessage());
+        assertSame(userObject, ci.getUserObject());
+    }
+    
+}

Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java (added)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+import org.apache.cxf.common.i18n.UncheckedException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class SuspendedInvocationExceptionTest extends Assert {
+    
+    @Test
+    public void testValidRuntimeException() {
+        
+        Throwable t = new UncheckedException(new Throwable());
+        SuspendedInvocationException ex = new SuspendedInvocationException(t);
+        
+        assertSame(t, ex.getRuntimeException());
+        assertSame(t, ex.getCause());
+        
+    }
+    
+    @Test
+    public void testNoRuntimeException() {
+        
+        SuspendedInvocationException ex = new SuspendedInvocationException(
+                                              new Throwable());
+        
+        assertNull(ex.getRuntimeException());
+    }
+    
+}

Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java (original)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java Tue Nov 18 04:27:34 2008
@@ -27,7 +27,9 @@
 import java.util.TreeSet;
 
 import org.apache.cxf.common.util.SortedArraySet;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.interceptor.InterceptorChain;
 import org.apache.cxf.message.Message;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
@@ -67,6 +69,51 @@
     }
 
     @Test
+    public void testState() throws Exception {
+        AbstractPhaseInterceptor p = setUpPhaseInterceptor("phase1", "p1");
+        control.replay();
+        chain.add(p);
+        
+        assertSame("Initial state is State.EXECUTING",
+                   InterceptorChain.State.EXECUTING, chain.getState());
+        chain.pause();
+        assertSame("Pausing chain should lead to State.PAUSED",
+                   InterceptorChain.State.PAUSED, chain.getState());
+        chain.resume();
+        assertSame("Resuming chain should lead to State.COMPLETE",
+                   InterceptorChain.State.COMPLETE, chain.getState());
+        chain.abort();
+        assertSame("Aborting chain should lead to State.ABORTED",
+                   InterceptorChain.State.ABORTED, chain.getState());
+    }
+    
+    @Test
+    public void testSuspendedException() throws Exception {
+        CountingPhaseInterceptor p1 = 
+            new CountingPhaseInterceptor("phase1", "p1");
+        SuspendedInvocationInterceptor p2 = 
+            new SuspendedInvocationInterceptor("phase2", "p2");
+        
+        message.getInterceptorChain();
+        EasyMock.expectLastCall().andReturn(chain).anyTimes();
+
+        control.replay();
+        
+        chain.add(p1);
+        chain.add(p2);
+        try {
+            chain.doIntercept(message);
+            fail("Suspended invocation swallowed");
+        } catch (SuspendedInvocationException ex) {
+            // ignore
+        }
+        
+        assertSame("No previous interceptor selected", p1, chain.iterator().next());
+        assertSame("Suspended invocation should lead to State.PAUSED",
+                   InterceptorChain.State.PAUSED, chain.getState());
+    }
+    
+    @Test
     public void testAddOneInterceptor() throws Exception {
         AbstractPhaseInterceptor p = setUpPhaseInterceptor("phase1", "p1");
         control.replay();
@@ -404,7 +451,7 @@
             invoked++;
         }
     }
-
+    
     public class WrapperingPhaseInterceptor extends CountingPhaseInterceptor {
         public WrapperingPhaseInterceptor(String phase, String id) {
             super(phase, id);
@@ -416,5 +463,15 @@
         }
     }
 
+    public class SuspendedInvocationInterceptor extends AbstractPhaseInterceptor<Message> {
+        
+        public SuspendedInvocationInterceptor(String phase, String id) {
+            super(id, phase);
+        }
+        
+        public void handleMessage(Message m) {
+            throw new SuspendedInvocationException(new Throwable());
+        }
+    }
 
 }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java Tue Nov 18 04:27:34 2008
@@ -29,6 +29,7 @@
 
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.frontend.MethodDispatcher;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.interceptor.Fault;
@@ -45,7 +46,6 @@
 public abstract class AbstractInvoker implements Invoker {
     private static final Logger LOG = LogUtils.getL7dLogger(AbstractInvoker.class);
     
-    
     public Object invoke(Exchange exchange, Object o) {
 
         final Object serviceObject = getServiceObject(exchange);
@@ -87,11 +87,18 @@
             
             return new MessageContentsList(res);
         } catch (InvocationTargetException e) {
+            
             Throwable t = e.getCause();
+            
             if (t == null) {
                 t = e;
             }
+            
+            checkSuspendedInvocation(exchange, serviceObject, m, params, t);
+            
             exchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT);
+            
+            
             for (Class<?> cl : m.getExceptionTypes()) {
                 if (cl.isInstance(t)) {
                     exchange.getInMessage().put(FaultMode.class, 
@@ -105,16 +112,38 @@
                 throw (Fault)t;
             }
             throw createFault(t, m, params, true);
+        } catch (SuspendedInvocationException suspendedEx) {
+            // to avoid duplicating the same log statement
+            checkSuspendedInvocation(exchange, serviceObject, m, params, suspendedEx);
+            // unreachable
+            throw suspendedEx;
         } catch (Fault f) {
             exchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT);
             throw f;
         } catch (Exception e) {
+            checkSuspendedInvocation(exchange, serviceObject, m, params, e);
             exchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT);
             throw createFault(e, m, params, false);
         }
     }
     
+    protected void checkSuspendedInvocation(Exchange exchange,
+                                            Object serviceObject, 
+                                            Method m, 
+                                            List<Object> params, 
+                                            Throwable t) {
+        if (t instanceof SuspendedInvocationException) {
+            
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, "SUSPENDED_INVOCATION_EXCEPTION", 
+                        new Object[]{serviceObject, m.toString(), params});
+            }
+            throw (SuspendedInvocationException)t;
+        }
+    }
+    
     protected Fault createFault(Throwable ex, Method m, List<Object> params, boolean checked) {
+        
         if (checked) {
             return new Fault(ex);
         } else {

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties Tue Nov 18 04:27:34 2008
@@ -24,4 +24,5 @@
 ILLEGAL_ACCESS=Couldn't access service object.
 CREATE_SERVICE_OBJECT_EXC=Couldn't instantiate service object.
 EXCEPTION_INVOKING_OBJECT={0} while invoking {1} with params {2}.
+SUSPENDED_INVOCATION_EXCEPTION=Invocation of method {1} on object {0} with params {2} has been suspended.
 INVOKING_METHOD=Invoking method {1} on object {0} with params {2}.

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Tue Nov 18 04:27:34 2008
@@ -29,6 +29,7 @@
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.interceptor.InterceptorChain;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -55,6 +56,16 @@
         Bus origBus = BusFactory.getThreadDefaultBus(false);
         BusFactory.setThreadDefaultBus(bus);
         try {
+            PhaseInterceptorChain phaseChain = null;
+            
+            if (m.getInterceptorChain() instanceof PhaseInterceptorChain) {
+                phaseChain = (PhaseInterceptorChain)m.getInterceptorChain();
+                if (phaseChain.getState() == InterceptorChain.State.PAUSED) {
+                    phaseChain.resume();
+                    return;
+                }
+            }
+            
             Message message = getBinding().createMessage(m);
             Exchange exchange = message.getExchange();
             if (exchange == null) {
@@ -64,18 +75,18 @@
             setExchangeProperties(exchange, message);
     
             // setup chain
-            PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
+            phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
                                                          bus.getInInterceptors(),
                                                          endpoint.getService().getInInterceptors(),
                                                          endpoint.getInInterceptors(),
                                                          getBinding().getInInterceptors());
             
             
-            message.setInterceptorChain(chain);
+            message.setInterceptorChain(phaseChain);
             
-            chain.setFaultObserver(endpoint.getOutFaultObserver());
+            phaseChain.setFaultObserver(endpoint.getOutFaultObserver());
            
-            chain.doIntercept(message);
+            phaseChain.doIntercept(message);
         } finally {
             BusFactory.setThreadDefaultBus(origBus);
         }

Added: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java (added)
+++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+import org.easymock.classextension.EasyMock;
+import org.easymock.classextension.IMocksControl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChainInitiationObserverTest extends Assert {
+
+    private IMocksControl control;
+    private TestChain chain;
+    private Message message;
+    private ChainInitiationObserver observer;
+    
+    @Before
+    public void setUp() {
+
+        control = EasyMock.createNiceControl();
+        message = control.createMock(Message.class);
+
+        Phase phase1 = new Phase("phase1", 1);
+        SortedSet<Phase> phases = new TreeSet<Phase>();
+        phases.add(phase1);
+        chain = new TestChain(phases);
+        observer = new ChainInitiationObserver(null, BusFactory.getDefaultBus());
+    }
+
+    @After
+    public void tearDown() {
+        control.verify();
+    }
+    
+    @Test
+    public void testPausedChain() {
+        message.getInterceptorChain();
+        EasyMock.expectLastCall().andReturn(chain).times(2);
+        control.replay();
+        
+        observer.onMessage(message);
+        assertTrue(chain.isInvoked());
+    }
+    
+    private static class TestChain extends PhaseInterceptorChain {
+        
+        private boolean invoked;
+        
+        public TestChain(SortedSet<Phase> ps) {
+            super(ps);
+        }
+        
+        @Override
+        public void resume() {
+            invoked = true;
+        }
+        
+        @Override
+        public State getState() {
+            return State.PAUSED;
+        }
+        
+        public boolean isInvoked() {
+            return invoked;
+        }
+    }
+}

Propchange: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java (original)
+++ cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java Tue Nov 18 04:27:34 2008
@@ -19,14 +19,21 @@
 package org.apache.cxf.jaxws;
 
 
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.frontend.MethodDispatcher;
 import org.apache.cxf.jaxws.service.Hello;
 import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageContentsList;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.Service;
 import org.apache.cxf.service.invoker.Factory;
+import org.apache.cxf.service.model.BindingOperationInfo;
 import org.easymock.classextension.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class JAXWSMethodInvokerTest {
+public class JAXWSMethodInvokerTest extends Assert {
     Factory factory = EasyMock.createMock(Factory.class);
     Object target = EasyMock.createMock(Hello.class);
         
@@ -39,9 +46,75 @@
         EasyMock.replay(factory);
         JAXWSMethodInvoker jaxwsMethodInvoker = new JAXWSMethodInvoker(factory);
         Object object = jaxwsMethodInvoker.getServiceObject(ex);
-        Assert.assertEquals("the target object and service object should be equal ", object, target);
+        assertEquals("the target object and service object should be equal ", object, target);
         EasyMock.verify(factory);
     }
         
 
+    @Test
+    public void testSuspendedException() throws Throwable {
+        Exchange ex = EasyMock.createNiceMock(Exchange.class);               
+        
+        Exception originalException = new RuntimeException();
+        ContinuationService serviceObject = 
+            new ContinuationService(originalException);
+        EasyMock.reset(factory);
+        factory.create(ex);
+        EasyMock.expectLastCall().andReturn(serviceObject);
+        factory.release(ex, serviceObject);
+        EasyMock.expectLastCall();
+        EasyMock.replay(factory);
+                        
+        Message inMessage = new MessageImpl();
+        ex.getInMessage();
+        EasyMock.expectLastCall().andReturn(inMessage);
+        inMessage.setExchange(ex);
+        inMessage.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+                
+        BindingOperationInfo boi = EasyMock.createMock(BindingOperationInfo.class);
+        ex.get(BindingOperationInfo.class);
+        EasyMock.expectLastCall().andReturn(boi);
+        
+        Service serviceClass = EasyMock.createMock(Service.class);
+        ex.get(Service.class);
+        EasyMock.expectLastCall().andReturn(serviceClass);
+        
+        MethodDispatcher md = EasyMock.createMock(MethodDispatcher.class);
+        serviceClass.get(MethodDispatcher.class.getName());
+        EasyMock.expectLastCall().andReturn(md);
+        
+        md.getMethod(boi);
+        EasyMock.expectLastCall().andReturn(
+            ContinuationService.class.getMethod("invoke", new Class[]{}));
+        
+        EasyMock.replay(ex);
+        EasyMock.replay(md);
+        EasyMock.replay(serviceClass);
+        
+        JAXWSMethodInvoker jaxwsMethodInvoker = new JAXWSMethodInvoker(factory);
+        try {
+            jaxwsMethodInvoker.invoke(ex, new MessageContentsList(new Object[]{}));
+            fail("Suspended invocation swallowed");
+        } catch (SuspendedInvocationException suspendedEx) {
+            assertSame(suspendedEx, serviceObject.getSuspendedException());
+            assertSame(originalException, suspendedEx.getRuntimeException());
+        }
+        
+    }
+    
+    public static class ContinuationService {
+        private RuntimeException ex;
+        
+        public ContinuationService(Exception throwable) {
+            ex = new SuspendedInvocationException(throwable);
+        }
+        
+        public void invoke() {
+            throw ex;
+        }
+        
+        public Throwable getSuspendedException() {
+            return ex;
+        }
+    }
 }

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Tue Nov 18 04:27:34 2008
@@ -33,24 +33,29 @@
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 import org.apache.cxf.transport.http.HTTPSession;
+import org.apache.cxf.transport.http_jetty.continuations.JettyContinuationProvider;
 import org.apache.cxf.transports.http.QueryHandler;
 import org.apache.cxf.transports.http.QueryHandlerRegistry;
 import org.apache.cxf.transports.http.StemMatchingQueryHandler;
 import org.mortbay.jetty.HttpConnection;
 import org.mortbay.jetty.Request;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
 
 public class JettyHTTPDestination extends AbstractHTTPDestination {
     
     private static final Logger LOG =
         LogUtils.getL7dLogger(JettyHTTPDestination.class);
 
-    
     protected JettyHTTPServerEngine engine;
     protected JettyHTTPTransportFactory transportFactory;
     protected JettyHTTPServerEngineFactory serverEngineFactory;
@@ -78,6 +83,7 @@
             JettyHTTPTransportFactory ci, 
             EndpointInfo              endpointInfo
     ) throws IOException {
+        
         //Add the defualt port if the address is missing it
         super(b, ci, endpointInfo, true);
         this.transportFactory = ci;
@@ -261,24 +267,39 @@
         throws IOException {
         Request baseRequest = (req instanceof Request) 
             ? (Request)req : HttpConnection.getCurrentConnection().getRequest();
-        try {
-            if (LOG.isLoggable(Level.FINE)) {
-                LOG.fine("Service http request on thread: " + Thread.currentThread());
+        
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Service http request on thread: " + Thread.currentThread());
+        }
+        MessageImpl inMessage = retrieveFromContinuation(req);
+        
+        
+        if (inMessage == null) {
+            
+            inMessage = new MessageImpl();
+            if (engine.getContinuationsEnabled()) {
+                inMessage.put(ContinuationProvider.class.getName(), 
+                          new JettyContinuationProvider(req, inMessage));
             }
-
-            MessageImpl inMessage = new MessageImpl();
+            
             setupMessage(inMessage, context, req, resp);
             
             inMessage.setDestination(this);
-
+    
             ExchangeImpl exchange = new ExchangeImpl();
             exchange.setInMessage(inMessage);
             exchange.setSession(new HTTPSession(req));
-            
-            incomingObserver.onMessage(inMessage);
+        }
 
+        try {    
+            incomingObserver.onMessage(inMessage);
+            
             resp.flushBuffer();
             baseRequest.setHandled(true);
+        } catch (SuspendedInvocationException ex) {
+            throw ex.getRuntimeException();
+        } catch (RuntimeException ex) {
+            throw ex;
         } finally {
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("Finished servicing http request on thread: " + Thread.currentThread());
@@ -286,6 +307,39 @@
         }
     }
 
+    protected MessageImpl retrieveFromContinuation(HttpServletRequest req) {
+        MessageImpl m = null;
+        
+        if (!engine.getContinuationsEnabled()) {
+            return null;
+        }
+        
+        Continuation cont = ContinuationSupport.getContinuation(req, null);
+        synchronized (cont) {
+            Object o = cont.getObject();
+            if (o instanceof ContinuationInfo) {
+                ContinuationInfo ci = (ContinuationInfo)o;
+                m = (MessageImpl)ci.getMessage();
+                
+                // now that we got the message we don't need ContinuationInfo
+                // as we don't know how continuation was suspended, by jetty wrapper
+                // or directly in which (latter) case we need to ensure that an original user object
+                // if any, need to be restored
+                cont.setObject(ci.getUserObject());
+            }
+            if (m == null && !cont.isNew()) {
+                String message = "No message for existing continuation, status : "
+                    + (cont.isPending() ? "Pending" : "Resumed");
+                if (!(o instanceof ContinuationInfo)) {
+                    message += ", ContinuationInfo object is unavailable";
+                }
+                LOG.warning(message);
+            }
+        }
+        
+        return m;
+    }
+    
     @Override
     public void shutdown() {
         transportFactory.removeDestination(endpointInfo);

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java Tue Nov 18 04:27:34 2008
@@ -92,6 +92,7 @@
     
     private Boolean isSessionSupport = false;
     private Boolean isReuseAddress = true;
+    private Boolean continuationsEnabled = true;
     private int servantCount;
     private Server server;
     private Connector connector;
@@ -99,6 +100,7 @@
     private JettyConnectorFactory connectorFactory;
     private ContextHandlerCollection contexts;
     
+    
     /**
      * This field holds the TLS ServerParameters that are programatically
      * configured. The tlsServerParamers (due to JAXB) holds the struct
@@ -116,7 +118,7 @@
      * has been called.
      */
     private boolean configFinalized;
-    
+        
     /**
      * This constructor is called by the JettyHTTPServerEngineFactory.
      */
@@ -140,6 +142,15 @@
     public void setPort(int p) {
         port = p;
     }
+    
+    public void setContinuationsEnabled(boolean enabled) {
+        continuationsEnabled = enabled;
+    }
+    
+    public boolean getContinuationsEnabled() {
+        return continuationsEnabled;
+    }
+    
     /**
      * The bus.
      */

Added: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.message.Message;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
+
+public class JettyContinuationProvider implements ContinuationProvider {
+
+    private HttpServletRequest request;
+    private Message inMessage; 
+    
+    public JettyContinuationProvider(HttpServletRequest req, Message m) {
+        request = req;
+        this.inMessage = m;
+    }
+    
+    public ContinuationWrapper getContinuation() {
+        if (inMessage.getExchange().isOneWay()) {
+            return null;
+        }
+        Continuation cont = ContinuationSupport.getContinuation(request, null);
+        return new JettyContinuationWrapper(cont, inMessage);
+    }
+
+}

Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.message.Message;
+import org.mortbay.jetty.RetryRequest;
+import org.mortbay.util.ajax.Continuation;
+
+public class JettyContinuationWrapper implements ContinuationWrapper {
+
+    private Continuation continuation;
+    private Message message;
+    
+    
+    public JettyContinuationWrapper(Continuation c, Message m) {
+        continuation = c; 
+        message = m;
+    }
+
+    public Object getObject() {
+        return continuation.getObject();
+    }
+
+    public boolean isNew() {
+        return continuation.isNew();
+    }
+
+    public boolean isPending() {
+        return continuation.isPending();
+    }
+
+    public boolean isResumed() {
+        return continuation.isResumed();
+    }
+
+    public void reset() {
+        continuation.reset();
+    }
+
+    public void resume() {
+        continuation.resume();
+    }
+
+    public void setObject(Object userObject) {
+        
+        ContinuationInfo ci = null;
+        
+        Object obj = continuation.getObject();
+        if (obj instanceof ContinuationInfo) {
+            ci = (ContinuationInfo)obj;
+        } else {
+            ci = new ContinuationInfo(message);
+            ci.setUserObject(obj);
+        }
+        if (message != userObject) {
+            ci.setUserObject(userObject);
+        }
+        continuation.setObject(ci);
+    }
+
+    public boolean suspend(long timeout) {
+        
+        Object obj = continuation.getObject();
+        if (obj == null) {
+            continuation.setObject(new ContinuationInfo(message));
+        }
+        try {
+            return continuation.suspend(timeout);
+        } catch (RetryRequest ex) {
+            throw new SuspendedInvocationException(ex);
+        }
+    }
+
+    protected Message getMessage() {
+        return message;
+    }
+    
+    protected Continuation getContinuation() {
+        return continuation;
+    }
+    
+    
+}

Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java Tue Nov 18 04:27:34 2008
@@ -57,6 +57,11 @@
         int port = Integer.valueOf(portStr);
         bean.addPropertyValue("port", port);
                
+        String continuationsStr = element.getAttribute("continuationsEnabled");
+        if (continuationsStr != null && continuationsStr.length() > 0) {
+            bean.addPropertyValue("continuationsEnabled", Boolean.parseBoolean(continuationsStr));
+        }
+        
         MutablePropertyValues engineFactoryProperties = ctx.getContainingBeanDefinition().getPropertyValues();
         PropertyValue busValue = engineFactoryProperties.getPropertyValue("bus");
               

Modified: cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd Tue Nov 18 04:27:34 2008
@@ -121,14 +121,21 @@
          <xs:element name="sessionSupport" type="xsd:boolean" minOccurs="0"/>
          <xs:element name="reuseAddress" type="xsd:boolean" minOccurs="0" />          
        </xs:sequence>
-          <xs:attribute name="port" type="xs:int" use="required">
+       
+       <xs:attribute name="port" type="xs:int" use="required">
              <xs:annotation>
                 <xs:documentation>Specifies the port used by the Jetty instance.
                 You can specify a value of 0 for the port attribute. Any threading 
                 properties specified in an engine element with its port attribute
                 set to 0 are used as the configuration for all Jetty listeners that are not explicitly configured.</xs:documentation>
              </xs:annotation>
-          </xs:attribute>
+       </xs:attribute>
+       <xs:attribute name="continuationsEnabled" type="xs:boolean">
+           <xs:annotation>
+                <xs:documentation>Specifies if Jetty Continuations will be explicitly supported
+                by Jetty destinations. Continuations will be checked if this attribute is set to true or                            omitted, ignored otherwise</xs:documentation>
+             </xs:annotation>
+       </xs:attribute>
     </xs:complexType>
     
     <xs:complexType name="JettyHTTPServerEngineFactoryConfigType">

Modified: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java Tue Nov 18 04:27:34 2008
@@ -29,6 +29,7 @@
 
 import javax.servlet.ServletInputStream;
 import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
 import javax.xml.bind.JAXBElement;
 import javax.xml.namespace.QName;
 
@@ -38,6 +39,8 @@
 import org.apache.cxf.common.util.Base64Utility;
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.endpoint.EndpointResolverRegistry;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.io.AbstractWrappedOutputStream;
@@ -56,6 +59,7 @@
 import org.apache.cxf.transports.http.configuration.HTTPServerPolicy;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.addressing.JAXWSAConstants;
 import org.apache.cxf.ws.policy.PolicyEngine;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.easymock.classextension.EasyMock;
@@ -65,8 +69,7 @@
 import org.mortbay.jetty.HttpFields;
 import org.mortbay.jetty.Request;
 import org.mortbay.jetty.Response;
-
-import static org.apache.cxf.ws.addressing.JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND;
+import org.mortbay.util.ajax.Continuation;
 
 public class JettyHTTPDestinationTest extends Assert {
     protected static final String AUTH_HEADER = "Authorization";
@@ -175,6 +178,103 @@
     }
     
     @Test
+    public void testSuspendedException() throws Exception {
+        destination = setUpDestination(false, false);
+        setUpDoService(false);
+        final RuntimeException ex = new RuntimeException();
+        observer = new MessageObserver() {
+            public void onMessage(Message m) {
+                throw new SuspendedInvocationException(ex);
+            }
+        };
+        destination.setMessageObserver(observer);
+        try {
+            destination.doService(request, response);
+            fail("Suspended invocation swallowed");
+        } catch (RuntimeException runtimeEx) {
+            assertSame("Original exception is not preserved", ex, runtimeEx);
+        }
+    }
+    
+    @Test
+    public void testRetrieveFromContinuation() throws Exception {
+        
+        Continuation continuation = EasyMock.createMock(Continuation.class);
+        
+        Message m = new MessageImpl();
+        ContinuationInfo ci = new ContinuationInfo(m);
+        Object userObject = new Object();
+        ci.setUserObject(userObject);
+        continuation.getObject();
+        EasyMock.expectLastCall().andReturn(ci);
+        continuation.setObject(ci.getUserObject());
+        EasyMock.expectLastCall();
+        EasyMock.replay(continuation);
+        
+        HttpServletRequest httpRequest = EasyMock.createMock(HttpServletRequest.class);
+        httpRequest.getAttribute("org.mortbay.jetty.ajax.Continuation");
+        EasyMock.expectLastCall().andReturn(continuation);
+        EasyMock.replay(httpRequest);
+        
+        ServiceInfo serviceInfo = new ServiceInfo();
+        serviceInfo.setName(new QName("bla", "Service"));
+        EndpointInfo ei = new EndpointInfo(serviceInfo, "");
+        ei.setName(new QName("bla", "Port"));
+        
+        transportFactory = new JettyHTTPTransportFactory();
+        transportFactory.setBus(new CXFBusImpl());
+        
+        TestJettyDestination testDestination = 
+            new TestJettyDestination(transportFactory.getBus(), 
+                                     transportFactory, ei);
+        testDestination.finalizeConfig();
+        MessageImpl mi = testDestination.retrieveFromContinuation(httpRequest);
+        assertSame("Message is lost", m, mi);
+        EasyMock.verify(continuation);
+        EasyMock.reset(httpRequest);
+        httpRequest.getAttribute("org.mortbay.jetty.ajax.Continuation");
+        EasyMock.expectLastCall().andReturn(null);
+        mi = testDestination.retrieveFromContinuation(httpRequest);
+        assertNotSame("New message expected", m, mi);
+    }
+    
+    @Test
+    public void testContinuationsIgnored() throws Exception {
+        
+        Continuation continuation = EasyMock.createMock(Continuation.class);
+        HttpServletRequest httpRequest = EasyMock.createMock(HttpServletRequest.class);
+        httpRequest.getAttribute("org.mortbay.jetty.ajax.Continuation");
+        EasyMock.expectLastCall().andReturn(continuation);
+        EasyMock.replay(httpRequest);
+        
+        ServiceInfo serviceInfo = new ServiceInfo();
+        serviceInfo.setName(new QName("bla", "Service"));
+        EndpointInfo ei = new EndpointInfo(serviceInfo, "");
+        ei.setName(new QName("bla", "Port"));
+        
+        final JettyHTTPServerEngine httpEngine = new JettyHTTPServerEngine();
+        httpEngine.setContinuationsEnabled(false);
+        JettyHTTPServerEngineFactory factory = new JettyHTTPServerEngineFactory() {
+            @Override
+            public JettyHTTPServerEngine retrieveJettyHTTPServerEngine(int port) {
+                return httpEngine;
+            }
+        };
+        transportFactory = new JettyHTTPTransportFactory();
+        transportFactory.setBus(new CXFBusImpl());
+        transportFactory.getBus().setExtension(
+            factory, JettyHTTPServerEngineFactory.class);
+        
+        
+        TestJettyDestination testDestination = 
+            new TestJettyDestination(transportFactory.getBus(), 
+                                     transportFactory, ei);
+        testDestination.finalizeConfig();
+        MessageImpl mi = testDestination.retrieveFromContinuation(httpRequest);
+        assertNull("Continuations must be ignored", mi);
+    }
+    
+    @Test
     public void testGetMultiple() throws Exception {
         transportFactory = new JettyHTTPTransportFactory();
         transportFactory.setBus(new CXFBusImpl());
@@ -449,7 +549,7 @@
         maps.getToEndpointReference();
         EasyMock.expectLastCall().andReturn(refWithId);
         EasyMock.replay(maps);      
-        context.put(SERVER_ADDRESSING_PROPERTIES_INBOUND, maps);
+        context.put(JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND, maps);
         String result = destination.getId(context);
         assertNotNull(result);
         assertEquals("match our id", result, id);
@@ -495,7 +595,7 @@
             EasyMock.replay(bus);
         }
         
-        engine = EasyMock.createMock(JettyHTTPServerEngine.class);
+        engine = EasyMock.createNiceMock(JettyHTTPServerEngine.class);
         ServiceInfo serviceInfo = new ServiceInfo();
         serviceInfo.setName(new QName("bla", "Service"));        
         endpointInfo = new EndpointInfo(serviceInfo, "");
@@ -506,6 +606,8 @@
         engine.addServant(EasyMock.eq(new URL(NOWHERE + "bar/foo")),
                           EasyMock.isA(JettyHTTPHandler.class));
         EasyMock.expectLastCall();
+        engine.getContinuationsEnabled();
+        EasyMock.expectLastCall().andReturn(true);
         EasyMock.replay(engine);
         
         JettyHTTPDestination dest = new EasyMockJettyHTTPDestination(bus,
@@ -612,6 +714,7 @@
                 EasyMock.expect(request.getQueryString()).andReturn(query);    
                 EasyMock.expect(request.getHeader("Accept")).andReturn("*/*");  
                 EasyMock.expect(request.getContentType()).andReturn("text/xml charset=utf8");
+                EasyMock.expect(request.getAttribute("org.mortbay.jetty.ajax.Continuation")).andReturn(null);
                 
                 HttpFields httpFields = new HttpFields();
                 httpFields.add("content-type", "text/xml");
@@ -855,4 +958,19 @@
     static EndpointReferenceType getEPR(String s) {
         return EndpointReferenceUtils.getEndpointReference(NOWHERE + s);
     }
+    
+    private static class TestJettyDestination extends JettyHTTPDestination {
+        public TestJettyDestination(Bus b,
+                                    JettyHTTPTransportFactory ci, 
+                                    EndpointInfo endpointInfo) throws IOException {
+            super(b, ci, endpointInfo);
+        }
+        
+        @Override
+        public MessageImpl retrieveFromContinuation(HttpServletRequest request) {
+            return super.retrieveFromContinuation(request);
+        }
+        
+        
+    }
 }

Added: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.easymock.classextension.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JettyContinuationProviderTest extends Assert {
+    
+    @Test
+    public void testGetContinuation() {
+        HttpServletRequest httpRequest = EasyMock.createMock(HttpServletRequest.class);
+        Message m = new MessageImpl();
+        m.setExchange(new ExchangeImpl());
+        JettyContinuationProvider provider = new JettyContinuationProvider(httpRequest, m);
+        JettyContinuationWrapper c = (JettyContinuationWrapper)provider.getContinuation();
+        assertSame(m, c.getMessage());
+        assertTrue(c.getContinuation().isNew());
+    }
+    
+    @Test
+    public void testNoContinuationForOneWay() {
+        Exchange exchange = new ExchangeImpl();
+        exchange.setOneWay(true);
+        Message m = new MessageImpl();
+        m.setExchange(exchange);
+        JettyContinuationProvider provider = new JettyContinuationProvider(null, m);
+        assertNull(provider.getContinuation());
+    }
+    
+}

Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.easymock.classextension.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mortbay.util.ajax.Continuation;
+
+public class JettyContinuationWrapperTest extends Assert {
+
+    @Test
+    public void testContinuationInterface() {
+        Message m = new MessageImpl();
+        ContinuationInfo ci = new ContinuationInfo(m);
+        Object userObject = new Object();
+        
+        Continuation c = EasyMock.createMock(Continuation.class);
+        c.isNew();
+        EasyMock.expectLastCall().andReturn(true);
+        c.isPending();
+        EasyMock.expectLastCall().andReturn(true);
+        c.isResumed();
+        EasyMock.expectLastCall().andReturn(true);
+        c.getObject();
+        EasyMock.expectLastCall().andReturn(ci).times(3);
+        
+        c.setObject(ci);
+        EasyMock.expectLastCall();
+        c.reset();
+        EasyMock.expectLastCall();
+        c.resume();
+        EasyMock.expectLastCall();
+        c.suspend(100);
+        EasyMock.expectLastCall().andReturn(true);
+        EasyMock.replay(c);
+        
+        JettyContinuationWrapper cw = new JettyContinuationWrapper(c, m);
+        cw.isNew();
+        cw.isPending();
+        cw.isResumed();
+        assertSame(ci, cw.getObject());
+        cw.setObject(userObject);
+        cw.reset();
+        cw.resume();
+        cw.suspend(100);
+        EasyMock.verify(c);
+        assertSame(userObject, ci.getUserObject());
+    }
+    
+}

Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Nov 18 04:27:34 2008
@@ -26,6 +26,7 @@
 import java.io.UnsupportedEncodingException;
 import java.util.Calendar;
 import java.util.GregorianCalendar;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.SimpleTimeZone;
@@ -44,6 +45,8 @@
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.ConfigurationException;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -53,6 +56,8 @@
 import org.apache.cxf.transport.AbstractMultiplexDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
+import org.apache.cxf.transport.jms.continuations.JMSContinuationWrapper;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.springframework.jms.core.JmsTemplate;
@@ -70,6 +75,8 @@
     private JMSConfiguration jmsConfig;
     private Bus bus;
     private DefaultMessageListenerContainer jmsListener;
+    private List<JMSContinuationWrapper> continuations = 
+        new LinkedList<JMSContinuationWrapper>();
 
     public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
         super(b, getTargetReference(info, b), info);
@@ -170,10 +177,18 @@
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
             inMessage.setDestination(this);
 
+            inMessage.put(ContinuationProvider.class.getName(), 
+                          new JMSContinuationProvider(bus,
+                                                      inMessage,
+                                                      incomingObserver,
+                                                      continuations));
+            
             BusFactory.setThreadDefaultBus(bus);
 
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
+        } catch (SuspendedInvocationException ex) {
+            System.out.println("Request message has been suspended");
         } catch (UnsupportedEncodingException ex) {
             getLogger().log(Level.WARNING, "can't get the right encoding information. " + ex);
         } finally {

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms.continuations;
+
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.MessageObserver;
+
+public class JMSContinuationProvider implements ContinuationProvider {
+
+    private Bus bus;
+    private Message inMessage;
+    private MessageObserver incomingObserver;
+    private List<JMSContinuationWrapper> continuations;
+    
+    public JMSContinuationProvider(Bus b,
+                                   Message m, 
+                                   MessageObserver observer,
+                                   List<JMSContinuationWrapper> cList) {
+        bus = b;
+        inMessage = m;    
+        incomingObserver = observer;
+        continuations = cList;
+    }
+    
+    public ContinuationWrapper getContinuation() {
+        if (inMessage.getExchange().isOneWay()) {
+            return null;
+        }
+        JMSContinuationWrapper cw = inMessage.get(JMSContinuationWrapper.class);
+        if (cw == null) {
+            cw = new JMSContinuationWrapper(bus,
+                                           inMessage, 
+                                           incomingObserver,
+                                           continuations);
+            inMessage.put(JMSContinuationWrapper.class, cw);
+        }
+        return cw;
+        
+        
+    }
+
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms.continuations;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.MessageObserver;
+
+public class JMSContinuationWrapper implements ContinuationWrapper {
+
+    private Bus bus;
+    private Message inMessage;
+    private MessageObserver incomingObserver;
+    private List<JMSContinuationWrapper> continuations;
+    
+    private Object userObject;
+    
+    private boolean isNew = true;
+    private boolean isPending;
+    private boolean isResumed;
+    private Timer timer = new Timer();
+    
+    public JMSContinuationWrapper(Bus b,
+                                  Message m, 
+                                  MessageObserver observer,
+                                  List<JMSContinuationWrapper> cList) {
+        bus = b;
+        inMessage = m;    
+        incomingObserver = observer;
+        continuations = cList;
+    }    
+    
+    public Object getObject() {
+        return userObject;
+    }
+
+    public boolean isNew() {
+        return isNew;
+    }
+
+    public boolean isPending() {
+        return isPending;
+    }
+
+    public boolean isResumed() {
+        return isResumed;
+    }
+
+    public void reset() {
+        cancelTimerTask();
+        isNew = true;
+        isPending = false;
+        isResumed = false;
+    }
+
+    public void resume() {
+        cancelTimerTask();
+        doResume();
+    }
+    
+    protected void doResume() {
+        if (isResumed) {
+            return;
+        }
+        
+        synchronized (continuations) {
+            continuations.remove(this);
+        }
+        
+        isResumed = true;
+        isPending = false;
+        isNew = false;
+
+        BusFactory.setThreadDefaultBus(bus);
+        try {
+            incomingObserver.onMessage(inMessage);
+        } finally {
+            BusFactory.setThreadDefaultBus(null);
+        }
+    }
+
+    public void setObject(Object o) {
+        userObject = o;
+    }
+
+    public boolean suspend(long timeout) {
+        
+        if (isPending) {
+            return false;
+        }
+        
+        synchronized (continuations) {
+            continuations.add(this);
+        }
+        
+        isNew = false;
+        isResumed = false;
+        isPending = true;
+        
+        if (timeout > 0) {
+            createTimerTask(timeout);
+        }
+        
+        throw new SuspendedInvocationException();
+    }
+
+    protected void createTimerTask(long timeout) {
+        timer.schedule(new TimerTask() {
+            public void run() {
+                doResume();
+            }
+        }, timeout);
+    }
+    
+    protected void cancelTimerTask() {
+        timer.cancel();
+    }
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date