You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by na...@apache.org on 2006/12/02 06:38:23 UTC

svn commit: r481505 - in /webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2: engine/ transport/ transport/http/

Author: nagy
Date: Fri Dec  1 21:38:19 2006
New Revision: 481505

URL: http://svn.apache.org/viewvc?view=rev&rev=481505
Log:
Extended RequestResponseTransport to allow the transport to block after control is returned (e.g. if the message is paused) and then resume once a response is available.  This will enable RM to be used for IN-OUT MEPs over a request/response transport.

Modified:
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/engine/AxisEngine.java Fri Dec  1 21:38:19 2006
@@ -143,7 +143,7 @@
      * @see Phase
      * @see Handler
      */
-    public void receive(MessageContext msgContext) throws AxisFault {
+    public InvocationResponse receive(MessageContext msgContext) throws AxisFault {
         if(log.isTraceEnabled()){
             log.trace("receive:"+msgContext.getMessageID());
         }
@@ -174,12 +174,12 @@
           }
           else if (pi.equals(InvocationResponse.SUSPEND))
           {
-            return;
+            return pi;
           }
           else if (pi.equals(InvocationResponse.ABORT))
           {
             flowComplete(msgContext, true);
-            return;
+            return pi;
           }
           else
           {
@@ -193,6 +193,8 @@
           flowComplete(msgContext, true);
           throw e;
         }
+        
+        return InvocationResponse.CONTINUE;
     }
 
     /**
@@ -378,7 +380,7 @@
      * @param msgContext
      * @throws AxisFault
      */
-    public void receiveFault(MessageContext msgContext) throws AxisFault {
+    public InvocationResponse receiveFault(MessageContext msgContext) throws AxisFault {
     	log.debug(Messages.getMessage("receivederrormessage",
                 msgContext.getMessageID()));
         ConfigurationContext confContext = msgContext.getConfigurationContext();
@@ -409,12 +411,12 @@
           }
           else if (pi.equals(InvocationResponse.SUSPEND))
           {
-            return;
+            return pi;
           }
           else if (pi.equals(InvocationResponse.ABORT))
           {
             flowComplete(msgContext, true);
-            return;
+            return pi;
           }
           else
           {
@@ -428,6 +430,8 @@
           flowComplete(msgContext, true);
           throw e;
         }
+        
+        return InvocationResponse.CONTINUE;
     }
 
     /**

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/RequestResponseTransport.java Fri Dec  1 21:38:19 2006
@@ -35,8 +35,29 @@
   public static final String TRANSPORT_CONTROL
     = "RequestResponseTransportControl";
 
+  /**
+   * Notify the transport that a message should be acknowledged at this time.
+   * 
+   * @param msgContext
+   * @throws AxisFault
+   */
   public void acknowledgeMessage(MessageContext msgContext) throws AxisFault;
   
-  //public void suspendOnReturn(MessageContext msgContext);
-  //public void processResponse(MessageContext msgContext);
+  /**
+   * Pause execution and wait for a response message to be ready.  This will
+   * typically be called by the transport after a message has been paused and
+   * will cause the transport to block until a response message is ready to be
+   * returned.  This is required to enable RM for in-out MEPs over a
+   * request/response transport; without it the message would be paused and the
+   * transport would simply ack the request.
+   *  
+   * @throws InterruptedException
+   */
+  public void awaitResponse() throws InterruptedException;
+  
+  /**
+   * Signal that a response has be created and is ready for transmission.  This
+   * should release anyone who is blocked on a awaitResponse().
+   */
+  public void signalResponseReady();
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/AxisServlet.java Fri Dec  1 21:38:19 2006
@@ -17,6 +17,8 @@
 
 package org.apache.axis2.transport.http;
 
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingHelper;
@@ -32,6 +34,7 @@
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.engine.ListenerManager;
+import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportListener;
 import org.apache.axis2.transport.http.server.HttpUtils;
@@ -249,10 +252,15 @@
             try {
                 // adding ServletContext into msgContext;
                 out = res.getOutputStream();
-                HTTPTransportUtils.processHTTPPostRequest(msgContext, req.getInputStream(), out,
+                InvocationResponse pi = HTTPTransportUtils.processHTTPPostRequest(msgContext, req.getInputStream(), out,
                         req.getContentType(), req.getHeader(HTTPConstants.HEADER_SOAP_ACTION),
                         req.getRequestURL().toString());
 
+                if (pi.equals(InvocationResponse.SUSPEND))
+                {
+                  ((RequestResponseTransport)msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL)).awaitResponse();
+                }
+                
                 Object contextWritten =
                         msgContext.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
 
@@ -547,6 +555,7 @@
     class ServletRequestResponseTransport implements RequestResponseTransport
     {
       private HttpServletResponse response;
+      private CountDownLatch responseReadySignal = new CountDownLatch(1);
       
       ServletRequestResponseTransport(HttpServletResponse response)
       {
@@ -555,6 +564,7 @@
       
       public void acknowledgeMessage(MessageContext msgContext) throws AxisFault
       {
+        log.debug("Acking one-way request");
         response.setContentType("text/xml; charset="
                                 + msgContext.getProperty(Constants.Configuration.CHARACTER_SET_ENCODING));
         
@@ -567,6 +577,21 @@
         {
           throw new AxisFault("Error sending acknowledgement", e);
         }
+        
+        signalResponseReady();
+      }
+      
+      public void awaitResponse()
+      throws InterruptedException
+      {
+        log.debug("Blocking servlet thread -- awaiting response");
+        responseReadySignal.await();
+      }
+      
+      public void signalResponseReady()
+      {
+        log.debug("Signalling response available");
+        responseReadySignal.countDown();
       }
     }
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPTransportUtils.java Fri Dec  1 21:38:19 2006
@@ -35,6 +35,7 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.TransportUtils;
 import org.apache.axis2.util.JavaUtils;
 import org.apache.axis2.util.Utils;
@@ -160,12 +161,14 @@
     private static final int VERSION_SOAP11 = 1;
     private static final int VERSION_SOAP12 = 2;
 
-    public static void processHTTPPostRequest(MessageContext msgContext, InputStream in,
+    public static InvocationResponse processHTTPPostRequest(MessageContext msgContext, InputStream in,
                                               OutputStream out, String contentType, String soapActionHeader, String requestURI)
             throws AxisFault {
 
         int soapVersion = VERSION_UNKNOWN;
 
+        InvocationResponse pi = InvocationResponse.CONTINUE;
+        
         try {
 
             Map headers = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
@@ -321,10 +324,12 @@
             AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
 
             if (envelope.getBody().hasFault()) {
-                engine.receiveFault(msgContext);
+                pi = engine.receiveFault(msgContext);
             } else {
-                engine.receive(msgContext);
+                pi = engine.receive(msgContext);
             }
+            
+            return pi;
         } catch (SOAPProcessingException e) {
             throw new AxisFault(e);
         } catch (AxisFault e) {

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java?view=diff&rev=481505&r1=481504&r2=481505
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/transport/http/HTTPWorker.java Fri Dec  1 21:38:19 2006
@@ -16,6 +16,9 @@
 
 package org.apache.axis2.transport.http;
 
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
+import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -23,6 +26,8 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.http.server.HttpUtils;
 import org.apache.axis2.transport.http.server.OutputBuffer;
 import org.apache.axis2.transport.http.server.Worker;
@@ -221,13 +226,15 @@
             OutputBuffer outbuffer = new OutputBuffer();
             msgContext.setProperty(MessageContext.TRANSPORT_OUT, outbuffer);
             msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, outbuffer);
+            msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
+                                   new SimpleHTTPRequestResponseTransport());
 
             HttpEntity inentity = ((HttpEntityEnclosingRequest) request).getEntity();
             String contenttype = null;
             if (inentity.getContentType() != null) {
                 contenttype = inentity.getContentType().getValue();
             }
-            HTTPTransportUtils.processHTTPPostRequest(
+            InvocationResponse pi = HTTPTransportUtils.processHTTPPostRequest(
                     msgContext,
                     inentity.getContent(),
                     outbuffer.getOutputStream(),
@@ -235,6 +242,18 @@
                     soapAction,
                     uri);
 
+            if (pi.equals(InvocationResponse.SUSPEND))
+            {
+              try
+              {
+                ((RequestResponseTransport)msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL)).awaitResponse();
+              }
+              catch (InterruptedException e)
+              {
+                throw new IOException("We were interrupted, so this may not function correctly:"+ e.getMessage());
+              }
+            }
+            
             outbuffer.setChunked(chunked);
             response.setEntity(outbuffer);
 
@@ -263,4 +282,25 @@
         }
     }
 
+    class SimpleHTTPRequestResponseTransport implements RequestResponseTransport
+    {
+      private CountDownLatch responseReadySignal = new CountDownLatch(1);
+      
+      public void acknowledgeMessage(MessageContext msgContext) throws AxisFault
+      {
+        //TODO: Once the core HTTP API allows us to return an ack before unwinding, then the should be fixed
+        signalResponseReady();
+      }
+      
+      public void awaitResponse()
+      throws InterruptedException
+      {
+        responseReadySignal.await();
+      }
+      
+      public void signalResponseReady()
+      {
+        responseReadySignal.countDown();
+      }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org