You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by na...@apache.org on 2006/12/02 06:38:23 UTC
svn commit: r481505 - in
/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2: engine/
transport/ transport/http/
Author: nagy
Date: Fri Dec 1 21:38:19 2006
New Revision: 481505
URL: http://svn.apache.org/viewvc?view=rev&rev=481505
Log:
Extended RequestResponseTransport to allow the transport to block after control is returned (e.g. if the message is paused) and then resume once a response is available. This will enable RM to be used for IN-OUT MEPs over a request/response transport.
Modified:
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java Fri Dec 1 21:38:19 2006
@@ -143,7 +143,7 @@
* @see Phase
* @see Handler
*/
- public void receive(MessageContext msgContext) throws AxisFault {
+ public InvocationResponse receive(MessageContext msgContext) throws AxisFault {
if(log.isTraceEnabled()){
log.trace("receive:"+msgContext.getMessageID());
}
@@ -174,12 +174,12 @@
}
else if (pi.equals(InvocationResponse.SUSPEND))
{
- return;
+ return pi;
}
else if (pi.equals(InvocationResponse.ABORT))
{
flowComplete(msgContext, true);
- return;
+ return pi;
}
else
{
@@ -193,6 +193,8 @@
flowComplete(msgContext, true);
throw e;
}
+
+ return InvocationResponse.CONTINUE;
}
/**
@@ -378,7 +380,7 @@
* @param msgContext
* @throws AxisFault
*/
- public void receiveFault(MessageContext msgContext) throws AxisFault {
+ public InvocationResponse receiveFault(MessageContext msgContext) throws AxisFault {
log.debug(Messages.getMessage("receivederrormessage",
msgContext.getMessageID()));
ConfigurationContext confContext = msgContext.getConfigurationContext();
@@ -409,12 +411,12 @@
}
else if (pi.equals(InvocationResponse.SUSPEND))
{
- return;
+ return pi;
}
else if (pi.equals(InvocationResponse.ABORT))
{
flowComplete(msgContext, true);
- return;
+ return pi;
}
else
{
@@ -428,6 +430,8 @@
flowComplete(msgContext, true);
throw e;
}
+
+ return InvocationResponse.CONTINUE;
}
/**
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java Fri Dec 1 21:38:19 2006
@@ -35,8 +35,29 @@
public static final String TRANSPORT_CONTROL
= "RequestResponseTransportControl";
+ /**
+ * Notify the transport that a message should be acknowledged at this time.
+ *
+ * @param msgContext
+ * @throws AxisFault
+ */
public void acknowledgeMessage(MessageContext msgContext) throws AxisFault;
- //public void suspendOnReturn(MessageContext msgContext);
- //public void processResponse(MessageContext msgContext);
+ /**
+ * Pause execution and wait for a response message to be ready. This will
+ * typically be called by the transport after a message has been paused and
+ * will cause the transport to block until a response message is ready to be
+ * returned. This is required to enable RM for in-out MEPs over a
+ * request/response transport; without it the message would be paused and the
+ * transport would simply ack the request.
+ *
+ * @throws InterruptedException
+ */
+ public void awaitResponse() throws InterruptedException;
+
+ /**
+ * Signal that a response has be created and is ready for transmission. This
+ * should release anyone who is blocked on a awaitResponse().
+ */
+ public void signalResponseReady();
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java Fri Dec 1 21:38:19 2006
@@ -17,6 +17,8 @@
package org.apache.axis2.transport.http;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingHelper;
@@ -32,6 +34,7 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.ListenerManager;
+import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportListener;
import org.apache.axis2.transport.http.server.HttpUtils;
@@ -249,10 +252,15 @@
try {
// adding ServletContext into msgContext;
out = res.getOutputStream();
- HTTPTransportUtils.processHTTPPostRequest(msgContext, req.getInputStream(), out,
+ InvocationResponse pi = HTTPTransportUtils.processHTTPPostRequest(msgContext, req.getInputStream(), out,
req.getContentType(), req.getHeader(HTTPConstants.HEADER_SOAP_ACTION),
req.getRequestURL().toString());
+ if (pi.equals(InvocationResponse.SUSPEND))
+ {
+ ((RequestResponseTransport)msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL)).awaitResponse();
+ }
+
Object contextWritten =
msgContext.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
@@ -547,6 +555,7 @@
class ServletRequestResponseTransport implements RequestResponseTransport
{
private HttpServletResponse response;
+ private CountDownLatch responseReadySignal = new CountDownLatch(1);
ServletRequestResponseTransport(HttpServletResponse response)
{
@@ -555,6 +564,7 @@
public void acknowledgeMessage(MessageContext msgContext) throws AxisFault
{
+ log.debug("Acking one-way request");
response.setContentType("text/xml; charset="
+ msgContext.getProperty(Constants.Configuration.CHARACTER_SET_ENCODING));
@@ -567,6 +577,21 @@
{
throw new AxisFault("Error sending acknowledgement", e);
}
+
+ signalResponseReady();
+ }
+
+ public void awaitResponse()
+ throws InterruptedException
+ {
+ log.debug("Blocking servlet thread -- awaiting response");
+ responseReadySignal.await();
+ }
+
+ public void signalResponseReady()
+ {
+ log.debug("Signalling response available");
+ responseReadySignal.countDown();
}
}
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java Fri Dec 1 21:38:19 2006
@@ -35,6 +35,7 @@
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.util.JavaUtils;
import org.apache.axis2.util.Utils;
@@ -160,12 +161,14 @@
private static final int VERSION_SOAP11 = 1;
private static final int VERSION_SOAP12 = 2;
- public static void processHTTPPostRequest(MessageContext msgContext, InputStream in,
+ public static InvocationResponse processHTTPPostRequest(MessageContext msgContext, InputStream in,
OutputStream out, String contentType, String soapActionHeader, String requestURI)
throws AxisFault {
int soapVersion = VERSION_UNKNOWN;
+ InvocationResponse pi = InvocationResponse.CONTINUE;
+
try {
Map headers = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
@@ -321,10 +324,12 @@
AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
if (envelope.getBody().hasFault()) {
- engine.receiveFault(msgContext);
+ pi = engine.receiveFault(msgContext);
} else {
- engine.receive(msgContext);
+ pi = engine.receive(msgContext);
}
+
+ return pi;
} catch (SOAPProcessingException e) {
throw new AxisFault(e);
} catch (AxisFault e) {
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java Fri Dec 1 21:38:19 2006
@@ -16,6 +16,9 @@
package org.apache.axis2.transport.http;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
+import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -23,6 +26,8 @@
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.http.server.HttpUtils;
import org.apache.axis2.transport.http.server.OutputBuffer;
import org.apache.axis2.transport.http.server.Worker;
@@ -221,13 +226,15 @@
OutputBuffer outbuffer = new OutputBuffer();
msgContext.setProperty(MessageContext.TRANSPORT_OUT, outbuffer);
msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, outbuffer);
+ msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
+ new SimpleHTTPRequestResponseTransport());
HttpEntity inentity = ((HttpEntityEnclosingRequest) request).getEntity();
String contenttype = null;
if (inentity.getContentType() != null) {
contenttype = inentity.getContentType().getValue();
}
- HTTPTransportUtils.processHTTPPostRequest(
+ InvocationResponse pi = HTTPTransportUtils.processHTTPPostRequest(
msgContext,
inentity.getContent(),
outbuffer.getOutputStream(),
@@ -235,6 +242,18 @@
soapAction,
uri);
+ if (pi.equals(InvocationResponse.SUSPEND))
+ {
+ try
+ {
+ ((RequestResponseTransport)msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL)).awaitResponse();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("We were interrupted, so this may not function correctly:"+ e.getMessage());
+ }
+ }
+
outbuffer.setChunked(chunked);
response.setEntity(outbuffer);
@@ -263,4 +282,25 @@
}
}
+ class SimpleHTTPRequestResponseTransport implements RequestResponseTransport
+ {
+ private CountDownLatch responseReadySignal = new CountDownLatch(1);
+
+ public void acknowledgeMessage(MessageContext msgContext) throws AxisFault
+ {
+ //TODO: Once the core HTTP API allows us to return an ack before unwinding, then the should be fixed
+ signalResponseReady();
+ }
+
+ public void awaitResponse()
+ throws InterruptedException
+ {
+ responseReadySignal.await();
+ }
+
+ public void signalResponseReady()
+ {
+ responseReadySignal.countDown();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org