You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2008/11/18 13:27:36 UTC
svn commit: r718565 [1/3] - in /cxf/trunk:
api/src/main/java/org/apache/cxf/continuations/
api/src/main/java/org/apache/cxf/phase/
api/src/test/java/org/apache/cxf/continuations/
api/src/test/java/org/apache/cxf/phase/ rt/core/src/main/java/org/apache/...
Author: sergeyb
Date: Tue Nov 18 04:27:34 2008
New Revision: 718565
URL: http://svn.apache.org/viewvc?rev=718565&view=rev
Log:
CXF-1835,CXF-1912: continuations support for HTTP and JMS transports
Added:
cxf/trunk/api/src/main/java/org/apache/cxf/continuations/
cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java (with props)
cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java (with props)
cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java (with props)
cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java (with props)
cxf/trunk/api/src/test/java/org/apache/cxf/continuations/
cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java (with props)
cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java (with props)
cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java (with props)
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (with props)
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (with props)
cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/
cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java (with props)
cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java (with props)
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java (with props)
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapperTest.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ClientServerWrappedContinuationTest.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/ControlWorker.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloContinuation.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloContinuationService.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloImplWithWrapppedContinuation.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/HelloWorker.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/cxf.xml (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/jaxws-server.xml (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/jetty-engine.xml (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/http_jetty/continuations/test.wsdl (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/GreeterImplWithContinuationsJMS.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/JMSContinuationsClientServerTest.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server2.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test.wsdl (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test.wsdl (with props)
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties
cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java
cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd
cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+import org.apache.cxf.message.Message;
+
+/**
+ * Utility class which can be used to associate an inbound message, as well as
+ * a user object, if any, with continuation
+ */
+public class ContinuationInfo {
+ private Message currentMessage;
+ private Object userObject;
+
+ public ContinuationInfo(Message m) {
+ currentMessage = m;
+ }
+
+ public Message getMessage() {
+ return currentMessage;
+ }
+
+ public void setUserObject(Object object) {
+ userObject = object;
+ }
+
+ public Object getUserObject() {
+ return userObject;
+ }
+}
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationInfo.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+/**
+ * Provides transport-neutral support for creating suspended invocation primitives
+ * or continuations
+ */
+public interface ContinuationProvider {
+
+ /**
+ * Creates a new continuation or retrieves the existing one
+ * @return transport-neutral ContinuationWrapper
+ */
+ ContinuationWrapper getContinuation();
+}
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationProvider.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+/**
+ * Represents transport-neutral suspended invocation instances
+ * or continuations
+ */
+public interface ContinuationWrapper {
+
+ /**
+ * This method will suspend the request for the timeout or until resume is
+ * called
+ *
+ * @param timeout. A timeout of < 0 will cause an immediate return.
+ * A timeout of 0 will wait indefinitely.
+ * @return True if resume called or false if timeout.
+ */
+ boolean suspend(long timeout);
+
+ /**
+ * Resume a suspended request
+ */
+ void resume();
+
+ /**
+ * Reset the continuation
+ */
+ void reset();
+
+ /**
+ * Is this a newly created Continuation.
+ * @return True if the continuation has just been created and has not yet suspended the request.
+ */
+ boolean isNew();
+
+ /**
+ * Get the pending status
+ * @return True if the continuation has been suspended.
+ */
+ boolean isPending();
+
+ /**
+ * Get the resumed status
+ * @return True if the continuation is has been resumed.
+ */
+ boolean isResumed();
+
+ /**
+ * Get arbitrary object associated with the continuation for context
+ *
+ * @return An arbitrary object associated with the continuation
+ */
+ Object getObject();
+
+ /**
+ * Sets arbitrary object associated with the continuation for context
+ *
+ * @param o An arbitrary object to associate with the continuation
+ */
+ void setObject(Object o);
+}
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/ContinuationWrapper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+/**
+ * Represents transport-specific exceptions which are used to indicate that
+ * a given invocation was suspended
+ */
+public class SuspendedInvocationException extends RuntimeException {
+
+ public SuspendedInvocationException(Throwable cause) {
+ super(cause);
+ }
+
+ public SuspendedInvocationException() {
+ }
+
+
+ /**
+ * Returns a transport-specific runtime exception
+ * @return RuntimeException the transport-specific runtime exception,
+ * can be null for asynchronous transports
+ */
+ public RuntimeException getRuntimeException() {
+ Throwable ex = getCause();
+ return ex instanceof RuntimeException ? (RuntimeException)ex : null;
+ }
+}
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/continuations/SuspendedInvocationException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Tue Nov 18 04:27:34 2008
@@ -31,6 +31,7 @@
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.interceptor.InterceptorChain;
@@ -138,6 +139,11 @@
}
}
+ // this method should really be on the InterceptorChain interface
+ public State getState() {
+ return state;
+ }
+
public PhaseInterceptorChain cloneChain() {
return new PhaseInterceptorChain(this);
}
@@ -218,6 +224,13 @@
}
//System.out.println("-----------" + currentInterceptor);
currentInterceptor.handleMessage(message);
+ } catch (SuspendedInvocationException ex) {
+ // we need to resume from the same interceptor the exception got originated from
+ if (iterator.hasPrevious()) {
+ iterator.previous();
+ }
+ pause();
+ throw ex;
} catch (RuntimeException ex) {
if (!faultOccurred) {
Added: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java (added)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ContinuationInfoTest extends Assert {
+
+ @Test
+ public void testContinuationInfo() {
+ Message m = new MessageImpl();
+ ContinuationInfo ci = new ContinuationInfo(m);
+ Object userObject = new Object();
+ ci.setUserObject(userObject);
+ assertSame(m, ci.getMessage());
+ assertSame(userObject, ci.getUserObject());
+ }
+
+}
Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/ContinuationInfoTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java (added)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.continuations;
+
+import org.apache.cxf.common.i18n.UncheckedException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class SuspendedInvocationExceptionTest extends Assert {
+
+ @Test
+ public void testValidRuntimeException() {
+
+ Throwable t = new UncheckedException(new Throwable());
+ SuspendedInvocationException ex = new SuspendedInvocationException(t);
+
+ assertSame(t, ex.getRuntimeException());
+ assertSame(t, ex.getCause());
+
+ }
+
+ @Test
+ public void testNoRuntimeException() {
+
+ SuspendedInvocationException ex = new SuspendedInvocationException(
+ new Throwable());
+
+ assertNull(ex.getRuntimeException());
+ }
+
+}
Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/test/java/org/apache/cxf/continuations/SuspendedInvocationExceptionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java (original)
+++ cxf/trunk/api/src/test/java/org/apache/cxf/phase/PhaseInterceptorChainTest.java Tue Nov 18 04:27:34 2008
@@ -27,7 +27,9 @@
import java.util.TreeSet;
import org.apache.cxf.common.util.SortedArraySet;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.message.Message;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
@@ -67,6 +69,51 @@
}
@Test
+ public void testState() throws Exception {
+ AbstractPhaseInterceptor p = setUpPhaseInterceptor("phase1", "p1");
+ control.replay();
+ chain.add(p);
+
+ assertSame("Initial state is State.EXECUTING",
+ InterceptorChain.State.EXECUTING, chain.getState());
+ chain.pause();
+ assertSame("Pausing chain should lead to State.PAUSED",
+ InterceptorChain.State.PAUSED, chain.getState());
+ chain.resume();
+ assertSame("Resuming chain should lead to State.COMPLETE",
+ InterceptorChain.State.COMPLETE, chain.getState());
+ chain.abort();
+ assertSame("Aborting chain should lead to State.ABORTED",
+ InterceptorChain.State.ABORTED, chain.getState());
+ }
+
+ @Test
+ public void testSuspendedException() throws Exception {
+ CountingPhaseInterceptor p1 =
+ new CountingPhaseInterceptor("phase1", "p1");
+ SuspendedInvocationInterceptor p2 =
+ new SuspendedInvocationInterceptor("phase2", "p2");
+
+ message.getInterceptorChain();
+ EasyMock.expectLastCall().andReturn(chain).anyTimes();
+
+ control.replay();
+
+ chain.add(p1);
+ chain.add(p2);
+ try {
+ chain.doIntercept(message);
+ fail("Suspended invocation swallowed");
+ } catch (SuspendedInvocationException ex) {
+ // ignore
+ }
+
+ assertSame("No previous interceptor selected", p1, chain.iterator().next());
+ assertSame("Suspended invocation should lead to State.PAUSED",
+ InterceptorChain.State.PAUSED, chain.getState());
+ }
+
+ @Test
public void testAddOneInterceptor() throws Exception {
AbstractPhaseInterceptor p = setUpPhaseInterceptor("phase1", "p1");
control.replay();
@@ -404,7 +451,7 @@
invoked++;
}
}
-
+
public class WrapperingPhaseInterceptor extends CountingPhaseInterceptor {
public WrapperingPhaseInterceptor(String phase, String id) {
super(phase, id);
@@ -416,5 +463,15 @@
}
}
+ public class SuspendedInvocationInterceptor extends AbstractPhaseInterceptor<Message> {
+
+ public SuspendedInvocationInterceptor(String phase, String id) {
+ super(id, phase);
+ }
+
+ public void handleMessage(Message m) {
+ throw new SuspendedInvocationException(new Throwable());
+ }
+ }
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/AbstractInvoker.java Tue Nov 18 04:27:34 2008
@@ -29,6 +29,7 @@
import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.frontend.MethodDispatcher;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.interceptor.Fault;
@@ -45,7 +46,6 @@
public abstract class AbstractInvoker implements Invoker {
private static final Logger LOG = LogUtils.getL7dLogger(AbstractInvoker.class);
-
public Object invoke(Exchange exchange, Object o) {
final Object serviceObject = getServiceObject(exchange);
@@ -87,11 +87,18 @@
return new MessageContentsList(res);
} catch (InvocationTargetException e) {
+
Throwable t = e.getCause();
+
if (t == null) {
t = e;
}
+
+ checkSuspendedInvocation(exchange, serviceObject, m, params, t);
+
exchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT);
+
+
for (Class<?> cl : m.getExceptionTypes()) {
if (cl.isInstance(t)) {
exchange.getInMessage().put(FaultMode.class,
@@ -105,16 +112,38 @@
throw (Fault)t;
}
throw createFault(t, m, params, true);
+ } catch (SuspendedInvocationException suspendedEx) {
+ // to avoid duplicating the same log statement
+ checkSuspendedInvocation(exchange, serviceObject, m, params, suspendedEx);
+ // unreachable
+ throw suspendedEx;
} catch (Fault f) {
exchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT);
throw f;
} catch (Exception e) {
+ checkSuspendedInvocation(exchange, serviceObject, m, params, e);
exchange.getInMessage().put(FaultMode.class, FaultMode.UNCHECKED_APPLICATION_FAULT);
throw createFault(e, m, params, false);
}
}
+ protected void checkSuspendedInvocation(Exchange exchange,
+ Object serviceObject,
+ Method m,
+ List<Object> params,
+ Throwable t) {
+ if (t instanceof SuspendedInvocationException) {
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "SUSPENDED_INVOCATION_EXCEPTION",
+ new Object[]{serviceObject, m.toString(), params});
+ }
+ throw (SuspendedInvocationException)t;
+ }
+ }
+
protected Fault createFault(Throwable ex, Method m, List<Object> params, boolean checked) {
+
if (checked) {
return new Fault(ex);
} else {
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/service/invoker/Messages.properties Tue Nov 18 04:27:34 2008
@@ -24,4 +24,5 @@
ILLEGAL_ACCESS=Couldn't access service object.
CREATE_SERVICE_OBJECT_EXC=Couldn't instantiate service object.
EXCEPTION_INVOKING_OBJECT={0} while invoking {1} with params {2}.
+SUSPENDED_INVOCATION_EXCEPTION=Invocation of method {1} on object {0} with params {2} has been suspended.
INVOKING_METHOD=Invoking method {1} on object {0} with params {2}.
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Tue Nov 18 04:27:34 2008
@@ -29,6 +29,7 @@
import org.apache.cxf.BusFactory;
import org.apache.cxf.binding.Binding;
import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
@@ -55,6 +56,16 @@
Bus origBus = BusFactory.getThreadDefaultBus(false);
BusFactory.setThreadDefaultBus(bus);
try {
+ PhaseInterceptorChain phaseChain = null;
+
+ if (m.getInterceptorChain() instanceof PhaseInterceptorChain) {
+ phaseChain = (PhaseInterceptorChain)m.getInterceptorChain();
+ if (phaseChain.getState() == InterceptorChain.State.PAUSED) {
+ phaseChain.resume();
+ return;
+ }
+ }
+
Message message = getBinding().createMessage(m);
Exchange exchange = message.getExchange();
if (exchange == null) {
@@ -64,18 +75,18 @@
setExchangeProperties(exchange, message);
// setup chain
- PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
+ phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
bus.getInInterceptors(),
endpoint.getService().getInInterceptors(),
endpoint.getInInterceptors(),
getBinding().getInInterceptors());
- message.setInterceptorChain(chain);
+ message.setInterceptorChain(phaseChain);
- chain.setFaultObserver(endpoint.getOutFaultObserver());
+ phaseChain.setFaultObserver(endpoint.getOutFaultObserver());
- chain.doIntercept(message);
+ phaseChain.doIntercept(message);
} finally {
BusFactory.setThreadDefaultBus(origBus);
}
Added: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java (added)
+++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+import org.easymock.classextension.EasyMock;
+import org.easymock.classextension.IMocksControl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChainInitiationObserverTest extends Assert {
+
+ private IMocksControl control;
+ private TestChain chain;
+ private Message message;
+ private ChainInitiationObserver observer;
+
+ @Before
+ public void setUp() {
+
+ control = EasyMock.createNiceControl();
+ message = control.createMock(Message.class);
+
+ Phase phase1 = new Phase("phase1", 1);
+ SortedSet<Phase> phases = new TreeSet<Phase>();
+ phases.add(phase1);
+ chain = new TestChain(phases);
+ observer = new ChainInitiationObserver(null, BusFactory.getDefaultBus());
+ }
+
+ @After
+ public void tearDown() {
+ control.verify();
+ }
+
+ @Test
+ public void testPausedChain() {
+ message.getInterceptorChain();
+ EasyMock.expectLastCall().andReturn(chain).times(2);
+ control.replay();
+
+ observer.onMessage(message);
+ assertTrue(chain.isInvoked());
+ }
+
+ private static class TestChain extends PhaseInterceptorChain {
+
+ private boolean invoked;
+
+ public TestChain(SortedSet<Phase> ps) {
+ super(ps);
+ }
+
+ @Override
+ public void resume() {
+ invoked = true;
+ }
+
+ @Override
+ public State getState() {
+ return State.PAUSED;
+ }
+
+ public boolean isInvoked() {
+ return invoked;
+ }
+ }
+}
Propchange: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/ChainInitiationObserverTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java (original)
+++ cxf/trunk/rt/frontend/jaxws/src/test/java/org/apache/cxf/jaxws/JAXWSMethodInvokerTest.java Tue Nov 18 04:27:34 2008
@@ -19,14 +19,21 @@
package org.apache.cxf.jaxws;
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.frontend.MethodDispatcher;
import org.apache.cxf.jaxws.service.Hello;
import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageContentsList;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.Service;
import org.apache.cxf.service.invoker.Factory;
+import org.apache.cxf.service.model.BindingOperationInfo;
import org.easymock.classextension.EasyMock;
import org.junit.Assert;
import org.junit.Test;
-public class JAXWSMethodInvokerTest {
+public class JAXWSMethodInvokerTest extends Assert {
Factory factory = EasyMock.createMock(Factory.class);
Object target = EasyMock.createMock(Hello.class);
@@ -39,9 +46,75 @@
EasyMock.replay(factory);
JAXWSMethodInvoker jaxwsMethodInvoker = new JAXWSMethodInvoker(factory);
Object object = jaxwsMethodInvoker.getServiceObject(ex);
- Assert.assertEquals("the target object and service object should be equal ", object, target);
+ assertEquals("the target object and service object should be equal ", object, target);
EasyMock.verify(factory);
}
+ @Test
+ public void testSuspendedException() throws Throwable {
+ Exchange ex = EasyMock.createNiceMock(Exchange.class);
+
+ Exception originalException = new RuntimeException();
+ ContinuationService serviceObject =
+ new ContinuationService(originalException);
+ EasyMock.reset(factory);
+ factory.create(ex);
+ EasyMock.expectLastCall().andReturn(serviceObject);
+ factory.release(ex, serviceObject);
+ EasyMock.expectLastCall();
+ EasyMock.replay(factory);
+
+ Message inMessage = new MessageImpl();
+ ex.getInMessage();
+ EasyMock.expectLastCall().andReturn(inMessage);
+ inMessage.setExchange(ex);
+ inMessage.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+
+ BindingOperationInfo boi = EasyMock.createMock(BindingOperationInfo.class);
+ ex.get(BindingOperationInfo.class);
+ EasyMock.expectLastCall().andReturn(boi);
+
+ Service serviceClass = EasyMock.createMock(Service.class);
+ ex.get(Service.class);
+ EasyMock.expectLastCall().andReturn(serviceClass);
+
+ MethodDispatcher md = EasyMock.createMock(MethodDispatcher.class);
+ serviceClass.get(MethodDispatcher.class.getName());
+ EasyMock.expectLastCall().andReturn(md);
+
+ md.getMethod(boi);
+ EasyMock.expectLastCall().andReturn(
+ ContinuationService.class.getMethod("invoke", new Class[]{}));
+
+ EasyMock.replay(ex);
+ EasyMock.replay(md);
+ EasyMock.replay(serviceClass);
+
+ JAXWSMethodInvoker jaxwsMethodInvoker = new JAXWSMethodInvoker(factory);
+ try {
+ jaxwsMethodInvoker.invoke(ex, new MessageContentsList(new Object[]{}));
+ fail("Suspended invocation swallowed");
+ } catch (SuspendedInvocationException suspendedEx) {
+ assertSame(suspendedEx, serviceObject.getSuspendedException());
+ assertSame(originalException, suspendedEx.getRuntimeException());
+ }
+
+ }
+
+ public static class ContinuationService {
+ private RuntimeException ex;
+
+ public ContinuationService(Exception throwable) {
+ ex = new SuspendedInvocationException(throwable);
+ }
+
+ public void invoke() {
+ throw ex;
+ }
+
+ public Throwable getSuspendedException() {
+ return ex;
+ }
+ }
}
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Tue Nov 18 04:27:34 2008
@@ -33,24 +33,29 @@
import org.apache.cxf.BusFactory;
import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
import org.apache.cxf.transport.http.HTTPSession;
+import org.apache.cxf.transport.http_jetty.continuations.JettyContinuationProvider;
import org.apache.cxf.transports.http.QueryHandler;
import org.apache.cxf.transports.http.QueryHandlerRegistry;
import org.apache.cxf.transports.http.StemMatchingQueryHandler;
import org.mortbay.jetty.HttpConnection;
import org.mortbay.jetty.Request;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
public class JettyHTTPDestination extends AbstractHTTPDestination {
private static final Logger LOG =
LogUtils.getL7dLogger(JettyHTTPDestination.class);
-
protected JettyHTTPServerEngine engine;
protected JettyHTTPTransportFactory transportFactory;
protected JettyHTTPServerEngineFactory serverEngineFactory;
@@ -78,6 +83,7 @@
JettyHTTPTransportFactory ci,
EndpointInfo endpointInfo
) throws IOException {
+
//Add the defualt port if the address is missing it
super(b, ci, endpointInfo, true);
this.transportFactory = ci;
@@ -261,24 +267,39 @@
throws IOException {
Request baseRequest = (req instanceof Request)
? (Request)req : HttpConnection.getCurrentConnection().getRequest();
- try {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Service http request on thread: " + Thread.currentThread());
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Service http request on thread: " + Thread.currentThread());
+ }
+ MessageImpl inMessage = retrieveFromContinuation(req);
+
+
+ if (inMessage == null) {
+
+ inMessage = new MessageImpl();
+ if (engine.getContinuationsEnabled()) {
+ inMessage.put(ContinuationProvider.class.getName(),
+ new JettyContinuationProvider(req, inMessage));
}
-
- MessageImpl inMessage = new MessageImpl();
+
setupMessage(inMessage, context, req, resp);
inMessage.setDestination(this);
-
+
ExchangeImpl exchange = new ExchangeImpl();
exchange.setInMessage(inMessage);
exchange.setSession(new HTTPSession(req));
-
- incomingObserver.onMessage(inMessage);
+ }
+ try {
+ incomingObserver.onMessage(inMessage);
+
resp.flushBuffer();
baseRequest.setHandled(true);
+ } catch (SuspendedInvocationException ex) {
+ throw ex.getRuntimeException();
+ } catch (RuntimeException ex) {
+ throw ex;
} finally {
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Finished servicing http request on thread: " + Thread.currentThread());
@@ -286,6 +307,39 @@
}
}
+ protected MessageImpl retrieveFromContinuation(HttpServletRequest req) {
+ MessageImpl m = null;
+
+ if (!engine.getContinuationsEnabled()) {
+ return null;
+ }
+
+ Continuation cont = ContinuationSupport.getContinuation(req, null);
+ synchronized (cont) {
+ Object o = cont.getObject();
+ if (o instanceof ContinuationInfo) {
+ ContinuationInfo ci = (ContinuationInfo)o;
+ m = (MessageImpl)ci.getMessage();
+
+ // now that we got the message we don't need ContinuationInfo
+ // as we don't know how continuation was suspended, by jetty wrapper
+ // or directly in which (latter) case we need to ensure that an original user object
+ // if any, need to be restored
+ cont.setObject(ci.getUserObject());
+ }
+ if (m == null && !cont.isNew()) {
+ String message = "No message for existing continuation, status : "
+ + (cont.isPending() ? "Pending" : "Resumed");
+ if (!(o instanceof ContinuationInfo)) {
+ message += ", ContinuationInfo object is unavailable";
+ }
+ LOG.warning(message);
+ }
+ }
+
+ return m;
+ }
+
@Override
public void shutdown() {
transportFactory.removeDestination(endpointInfo);
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java Tue Nov 18 04:27:34 2008
@@ -92,6 +92,7 @@
private Boolean isSessionSupport = false;
private Boolean isReuseAddress = true;
+ private Boolean continuationsEnabled = true;
private int servantCount;
private Server server;
private Connector connector;
@@ -99,6 +100,7 @@
private JettyConnectorFactory connectorFactory;
private ContextHandlerCollection contexts;
+
/**
* This field holds the TLS ServerParameters that are programatically
* configured. The tlsServerParamers (due to JAXB) holds the struct
@@ -116,7 +118,7 @@
* has been called.
*/
private boolean configFinalized;
-
+
/**
* This constructor is called by the JettyHTTPServerEngineFactory.
*/
@@ -140,6 +142,15 @@
public void setPort(int p) {
port = p;
}
+
+ public void setContinuationsEnabled(boolean enabled) {
+ continuationsEnabled = enabled;
+ }
+
+ public boolean getContinuationsEnabled() {
+ return continuationsEnabled;
+ }
+
/**
* The bus.
*/
Added: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.message.Message;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
+
+public class JettyContinuationProvider implements ContinuationProvider {
+
+ private HttpServletRequest request;
+ private Message inMessage;
+
+ public JettyContinuationProvider(HttpServletRequest req, Message m) {
+ request = req;
+ this.inMessage = m;
+ }
+
+ public ContinuationWrapper getContinuation() {
+ if (inMessage.getExchange().isOneWay()) {
+ return null;
+ }
+ Continuation cont = ContinuationSupport.getContinuation(request, null);
+ return new JettyContinuationWrapper(cont, inMessage);
+ }
+
+}
Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProvider.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.message.Message;
+import org.mortbay.jetty.RetryRequest;
+import org.mortbay.util.ajax.Continuation;
+
+public class JettyContinuationWrapper implements ContinuationWrapper {
+
+ private Continuation continuation;
+ private Message message;
+
+
+ public JettyContinuationWrapper(Continuation c, Message m) {
+ continuation = c;
+ message = m;
+ }
+
+ public Object getObject() {
+ return continuation.getObject();
+ }
+
+ public boolean isNew() {
+ return continuation.isNew();
+ }
+
+ public boolean isPending() {
+ return continuation.isPending();
+ }
+
+ public boolean isResumed() {
+ return continuation.isResumed();
+ }
+
+ public void reset() {
+ continuation.reset();
+ }
+
+ public void resume() {
+ continuation.resume();
+ }
+
+ public void setObject(Object userObject) {
+
+ ContinuationInfo ci = null;
+
+ Object obj = continuation.getObject();
+ if (obj instanceof ContinuationInfo) {
+ ci = (ContinuationInfo)obj;
+ } else {
+ ci = new ContinuationInfo(message);
+ ci.setUserObject(obj);
+ }
+ if (message != userObject) {
+ ci.setUserObject(userObject);
+ }
+ continuation.setObject(ci);
+ }
+
+ public boolean suspend(long timeout) {
+
+ Object obj = continuation.getObject();
+ if (obj == null) {
+ continuation.setObject(new ContinuationInfo(message));
+ }
+ try {
+ return continuation.suspend(timeout);
+ } catch (RetryRequest ex) {
+ throw new SuspendedInvocationException(ex);
+ }
+ }
+
+ protected Message getMessage() {
+ return message;
+ }
+
+ protected Continuation getContinuation() {
+ return continuation;
+ }
+
+
+}
Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/spring/JettyHTTPServerEngineBeanDefinitionParser.java Tue Nov 18 04:27:34 2008
@@ -57,6 +57,11 @@
int port = Integer.valueOf(portStr);
bean.addPropertyValue("port", port);
+ String continuationsStr = element.getAttribute("continuationsEnabled");
+ if (continuationsStr != null && continuationsStr.length() > 0) {
+ bean.addPropertyValue("continuationsEnabled", Boolean.parseBoolean(continuationsStr));
+ }
+
MutablePropertyValues engineFactoryProperties = ctx.getContainingBeanDefinition().getPropertyValues();
PropertyValue busValue = engineFactoryProperties.getPropertyValue("bus");
Modified: cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/resources/schemas/configuration/http-jetty.xsd Tue Nov 18 04:27:34 2008
@@ -121,14 +121,21 @@
<xs:element name="sessionSupport" type="xsd:boolean" minOccurs="0"/>
<xs:element name="reuseAddress" type="xsd:boolean" minOccurs="0" />
</xs:sequence>
- <xs:attribute name="port" type="xs:int" use="required">
+
+ <xs:attribute name="port" type="xs:int" use="required">
<xs:annotation>
<xs:documentation>Specifies the port used by the Jetty instance.
You can specify a value of 0 for the port attribute. Any threading
properties specified in an engine element with its port attribute
set to 0 are used as the configuration for all Jetty listeners that are not explicitly configured.</xs:documentation>
</xs:annotation>
- </xs:attribute>
+ </xs:attribute>
+ <xs:attribute name="continuationsEnabled" type="xs:boolean">
+ <xs:annotation>
+ <xs:documentation>Specifies if Jetty Continuations will be explicitly supported
+ by Jetty destinations. Continuations will be checked if this attribute is set to true or omitted, ignored otherwise</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:complexType>
<xs:complexType name="JettyHTTPServerEngineFactoryConfigType">
Modified: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java Tue Nov 18 04:27:34 2008
@@ -29,6 +29,7 @@
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
import javax.xml.bind.JAXBElement;
import javax.xml.namespace.QName;
@@ -38,6 +39,8 @@
import org.apache.cxf.common.util.Base64Utility;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.configuration.security.AuthorizationPolicy;
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.endpoint.EndpointResolverRegistry;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.io.AbstractWrappedOutputStream;
@@ -56,6 +59,7 @@
import org.apache.cxf.transports.http.configuration.HTTPServerPolicy;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.addressing.JAXWSAConstants;
import org.apache.cxf.ws.policy.PolicyEngine;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.easymock.classextension.EasyMock;
@@ -65,8 +69,7 @@
import org.mortbay.jetty.HttpFields;
import org.mortbay.jetty.Request;
import org.mortbay.jetty.Response;
-
-import static org.apache.cxf.ws.addressing.JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND;
+import org.mortbay.util.ajax.Continuation;
public class JettyHTTPDestinationTest extends Assert {
protected static final String AUTH_HEADER = "Authorization";
@@ -175,6 +178,103 @@
}
@Test
+ public void testSuspendedException() throws Exception {
+ destination = setUpDestination(false, false);
+ setUpDoService(false);
+ final RuntimeException ex = new RuntimeException();
+ observer = new MessageObserver() {
+ public void onMessage(Message m) {
+ throw new SuspendedInvocationException(ex);
+ }
+ };
+ destination.setMessageObserver(observer);
+ try {
+ destination.doService(request, response);
+ fail("Suspended invocation swallowed");
+ } catch (RuntimeException runtimeEx) {
+ assertSame("Original exception is not preserved", ex, runtimeEx);
+ }
+ }
+
+ @Test
+ public void testRetrieveFromContinuation() throws Exception {
+
+ Continuation continuation = EasyMock.createMock(Continuation.class);
+
+ Message m = new MessageImpl();
+ ContinuationInfo ci = new ContinuationInfo(m);
+ Object userObject = new Object();
+ ci.setUserObject(userObject);
+ continuation.getObject();
+ EasyMock.expectLastCall().andReturn(ci);
+ continuation.setObject(ci.getUserObject());
+ EasyMock.expectLastCall();
+ EasyMock.replay(continuation);
+
+ HttpServletRequest httpRequest = EasyMock.createMock(HttpServletRequest.class);
+ httpRequest.getAttribute("org.mortbay.jetty.ajax.Continuation");
+ EasyMock.expectLastCall().andReturn(continuation);
+ EasyMock.replay(httpRequest);
+
+ ServiceInfo serviceInfo = new ServiceInfo();
+ serviceInfo.setName(new QName("bla", "Service"));
+ EndpointInfo ei = new EndpointInfo(serviceInfo, "");
+ ei.setName(new QName("bla", "Port"));
+
+ transportFactory = new JettyHTTPTransportFactory();
+ transportFactory.setBus(new CXFBusImpl());
+
+ TestJettyDestination testDestination =
+ new TestJettyDestination(transportFactory.getBus(),
+ transportFactory, ei);
+ testDestination.finalizeConfig();
+ MessageImpl mi = testDestination.retrieveFromContinuation(httpRequest);
+ assertSame("Message is lost", m, mi);
+ EasyMock.verify(continuation);
+ EasyMock.reset(httpRequest);
+ httpRequest.getAttribute("org.mortbay.jetty.ajax.Continuation");
+ EasyMock.expectLastCall().andReturn(null);
+ mi = testDestination.retrieveFromContinuation(httpRequest);
+ assertNotSame("New message expected", m, mi);
+ }
+
+ @Test
+ public void testContinuationsIgnored() throws Exception {
+
+ Continuation continuation = EasyMock.createMock(Continuation.class);
+ HttpServletRequest httpRequest = EasyMock.createMock(HttpServletRequest.class);
+ httpRequest.getAttribute("org.mortbay.jetty.ajax.Continuation");
+ EasyMock.expectLastCall().andReturn(continuation);
+ EasyMock.replay(httpRequest);
+
+ ServiceInfo serviceInfo = new ServiceInfo();
+ serviceInfo.setName(new QName("bla", "Service"));
+ EndpointInfo ei = new EndpointInfo(serviceInfo, "");
+ ei.setName(new QName("bla", "Port"));
+
+ final JettyHTTPServerEngine httpEngine = new JettyHTTPServerEngine();
+ httpEngine.setContinuationsEnabled(false);
+ JettyHTTPServerEngineFactory factory = new JettyHTTPServerEngineFactory() {
+ @Override
+ public JettyHTTPServerEngine retrieveJettyHTTPServerEngine(int port) {
+ return httpEngine;
+ }
+ };
+ transportFactory = new JettyHTTPTransportFactory();
+ transportFactory.setBus(new CXFBusImpl());
+ transportFactory.getBus().setExtension(
+ factory, JettyHTTPServerEngineFactory.class);
+
+
+ TestJettyDestination testDestination =
+ new TestJettyDestination(transportFactory.getBus(),
+ transportFactory, ei);
+ testDestination.finalizeConfig();
+ MessageImpl mi = testDestination.retrieveFromContinuation(httpRequest);
+ assertNull("Continuations must be ignored", mi);
+ }
+
+ @Test
public void testGetMultiple() throws Exception {
transportFactory = new JettyHTTPTransportFactory();
transportFactory.setBus(new CXFBusImpl());
@@ -449,7 +549,7 @@
maps.getToEndpointReference();
EasyMock.expectLastCall().andReturn(refWithId);
EasyMock.replay(maps);
- context.put(SERVER_ADDRESSING_PROPERTIES_INBOUND, maps);
+ context.put(JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND, maps);
String result = destination.getId(context);
assertNotNull(result);
assertEquals("match our id", result, id);
@@ -495,7 +595,7 @@
EasyMock.replay(bus);
}
- engine = EasyMock.createMock(JettyHTTPServerEngine.class);
+ engine = EasyMock.createNiceMock(JettyHTTPServerEngine.class);
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setName(new QName("bla", "Service"));
endpointInfo = new EndpointInfo(serviceInfo, "");
@@ -506,6 +606,8 @@
engine.addServant(EasyMock.eq(new URL(NOWHERE + "bar/foo")),
EasyMock.isA(JettyHTTPHandler.class));
EasyMock.expectLastCall();
+ engine.getContinuationsEnabled();
+ EasyMock.expectLastCall().andReturn(true);
EasyMock.replay(engine);
JettyHTTPDestination dest = new EasyMockJettyHTTPDestination(bus,
@@ -612,6 +714,7 @@
EasyMock.expect(request.getQueryString()).andReturn(query);
EasyMock.expect(request.getHeader("Accept")).andReturn("*/*");
EasyMock.expect(request.getContentType()).andReturn("text/xml charset=utf8");
+ EasyMock.expect(request.getAttribute("org.mortbay.jetty.ajax.Continuation")).andReturn(null);
HttpFields httpFields = new HttpFields();
httpFields.add("content-type", "text/xml");
@@ -855,4 +958,19 @@
static EndpointReferenceType getEPR(String s) {
return EndpointReferenceUtils.getEndpointReference(NOWHERE + s);
}
+
+ private static class TestJettyDestination extends JettyHTTPDestination {
+ public TestJettyDestination(Bus b,
+ JettyHTTPTransportFactory ci,
+ EndpointInfo endpointInfo) throws IOException {
+ super(b, ci, endpointInfo);
+ }
+
+ @Override
+ public MessageImpl retrieveFromContinuation(HttpServletRequest request) {
+ return super.retrieveFromContinuation(request);
+ }
+
+
+ }
}
Added: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.easymock.classextension.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JettyContinuationProviderTest extends Assert {
+
+ @Test
+ public void testGetContinuation() {
+ HttpServletRequest httpRequest = EasyMock.createMock(HttpServletRequest.class);
+ Message m = new MessageImpl();
+ m.setExchange(new ExchangeImpl());
+ JettyContinuationProvider provider = new JettyContinuationProvider(httpRequest, m);
+ JettyContinuationWrapper c = (JettyContinuationWrapper)provider.getContinuation();
+ assertSame(m, c.getMessage());
+ assertTrue(c.getContinuation().isNew());
+ }
+
+ @Test
+ public void testNoContinuationForOneWay() {
+ Exchange exchange = new ExchangeImpl();
+ exchange.setOneWay(true);
+ Message m = new MessageImpl();
+ m.setExchange(exchange);
+ JettyContinuationProvider provider = new JettyContinuationProvider(null, m);
+ assertNull(provider.getContinuation());
+ }
+
+}
Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationProviderTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java (added)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http_jetty.continuations;
+
+import org.apache.cxf.continuations.ContinuationInfo;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.easymock.classextension.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mortbay.util.ajax.Continuation;
+
+public class JettyContinuationWrapperTest extends Assert {
+
+ @Test
+ public void testContinuationInterface() {
+ Message m = new MessageImpl();
+ ContinuationInfo ci = new ContinuationInfo(m);
+ Object userObject = new Object();
+
+ Continuation c = EasyMock.createMock(Continuation.class);
+ c.isNew();
+ EasyMock.expectLastCall().andReturn(true);
+ c.isPending();
+ EasyMock.expectLastCall().andReturn(true);
+ c.isResumed();
+ EasyMock.expectLastCall().andReturn(true);
+ c.getObject();
+ EasyMock.expectLastCall().andReturn(ci).times(3);
+
+ c.setObject(ci);
+ EasyMock.expectLastCall();
+ c.reset();
+ EasyMock.expectLastCall();
+ c.resume();
+ EasyMock.expectLastCall();
+ c.suspend(100);
+ EasyMock.expectLastCall().andReturn(true);
+ EasyMock.replay(c);
+
+ JettyContinuationWrapper cw = new JettyContinuationWrapper(c, m);
+ cw.isNew();
+ cw.isPending();
+ cw.isResumed();
+ assertSame(ci, cw.getObject());
+ cw.setObject(userObject);
+ cw.reset();
+ cw.resume();
+ cw.suspend(100);
+ EasyMock.verify(c);
+ assertSame(userObject, ci.getUserObject());
+ }
+
+}
Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapperTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=718565&r1=718564&r2=718565&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Nov 18 04:27:34 2008
@@ -26,6 +26,7 @@
import java.io.UnsupportedEncodingException;
import java.util.Calendar;
import java.util.GregorianCalendar;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SimpleTimeZone;
@@ -44,6 +45,8 @@
import org.apache.cxf.BusFactory;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.ConfigurationException;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
@@ -53,6 +56,8 @@
import org.apache.cxf.transport.AbstractMultiplexDestination;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
+import org.apache.cxf.transport.jms.continuations.JMSContinuationWrapper;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.springframework.jms.core.JmsTemplate;
@@ -70,6 +75,8 @@
private JMSConfiguration jmsConfig;
private Bus bus;
private DefaultMessageListenerContainer jmsListener;
+ private List<JMSContinuationWrapper> continuations =
+ new LinkedList<JMSContinuationWrapper>();
public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
super(b, getTargetReference(info, b), info);
@@ -170,10 +177,18 @@
inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
inMessage.setDestination(this);
+ inMessage.put(ContinuationProvider.class.getName(),
+ new JMSContinuationProvider(bus,
+ inMessage,
+ incomingObserver,
+ continuations));
+
BusFactory.setThreadDefaultBus(bus);
// handle the incoming message
incomingObserver.onMessage(inMessage);
+ } catch (SuspendedInvocationException ex) {
+ System.out.println("Request message has been suspended");
} catch (UnsupportedEncodingException ex) {
getLogger().log(Level.WARNING, "can't get the right encoding information. " + ex);
} finally {
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms.continuations;
+
+import java.util.List;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.MessageObserver;
+
+public class JMSContinuationProvider implements ContinuationProvider {
+
+ private Bus bus;
+ private Message inMessage;
+ private MessageObserver incomingObserver;
+ private List<JMSContinuationWrapper> continuations;
+
+ public JMSContinuationProvider(Bus b,
+ Message m,
+ MessageObserver observer,
+ List<JMSContinuationWrapper> cList) {
+ bus = b;
+ inMessage = m;
+ incomingObserver = observer;
+ continuations = cList;
+ }
+
+ public ContinuationWrapper getContinuation() {
+ if (inMessage.getExchange().isOneWay()) {
+ return null;
+ }
+ JMSContinuationWrapper cw = inMessage.get(JMSContinuationWrapper.class);
+ if (cw == null) {
+ cw = new JMSContinuationWrapper(bus,
+ inMessage,
+ incomingObserver,
+ continuations);
+ inMessage.put(JMSContinuationWrapper.class, cw);
+ }
+ return cw;
+
+
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java?rev=718565&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java Tue Nov 18 04:27:34 2008
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms.continuations;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.continuations.ContinuationWrapper;
+import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.MessageObserver;
+
+public class JMSContinuationWrapper implements ContinuationWrapper {
+
+ private Bus bus;
+ private Message inMessage;
+ private MessageObserver incomingObserver;
+ private List<JMSContinuationWrapper> continuations;
+
+ private Object userObject;
+
+ private boolean isNew = true;
+ private boolean isPending;
+ private boolean isResumed;
+ private Timer timer = new Timer();
+
+ public JMSContinuationWrapper(Bus b,
+ Message m,
+ MessageObserver observer,
+ List<JMSContinuationWrapper> cList) {
+ bus = b;
+ inMessage = m;
+ incomingObserver = observer;
+ continuations = cList;
+ }
+
+ public Object getObject() {
+ return userObject;
+ }
+
+ public boolean isNew() {
+ return isNew;
+ }
+
+ public boolean isPending() {
+ return isPending;
+ }
+
+ public boolean isResumed() {
+ return isResumed;
+ }
+
+ public void reset() {
+ cancelTimerTask();
+ isNew = true;
+ isPending = false;
+ isResumed = false;
+ }
+
+ public void resume() {
+ cancelTimerTask();
+ doResume();
+ }
+
+ protected void doResume() {
+ if (isResumed) {
+ return;
+ }
+
+ synchronized (continuations) {
+ continuations.remove(this);
+ }
+
+ isResumed = true;
+ isPending = false;
+ isNew = false;
+
+ BusFactory.setThreadDefaultBus(bus);
+ try {
+ incomingObserver.onMessage(inMessage);
+ } finally {
+ BusFactory.setThreadDefaultBus(null);
+ }
+ }
+
+ public void setObject(Object o) {
+ userObject = o;
+ }
+
+ public boolean suspend(long timeout) {
+
+ if (isPending) {
+ return false;
+ }
+
+ synchronized (continuations) {
+ continuations.add(this);
+ }
+
+ isNew = false;
+ isResumed = false;
+ isPending = true;
+
+ if (timeout > 0) {
+ createTimerTask(timeout);
+ }
+
+ throw new SuspendedInvocationException();
+ }
+
+ protected void createTimerTask(long timeout) {
+ timer.schedule(new TimerTask() {
+ public void run() {
+ doResume();
+ }
+ }, timeout);
+ }
+
+ protected void cancelTimerTask() {
+ timer.cancel();
+ }
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationWrapper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date