You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/11 12:26:44 UTC

svn commit: r694207 - in /servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http: endpoints/HttpConsumerEndpoint.java processors/ConsumerProcessor.java

Author: gnodet
Date: Thu Sep 11 03:26:43 2008
New Revision: 694207

URL: http://svn.apache.org/viewvc?rev=694207&view=rev
Log:
SM-1407: Fix problem with some exchanges not sent back in ERROR when a timeout occur

Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=694207&r1=694206&r2=694207&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Thu Sep 11 03:26:43 2008
@@ -91,30 +91,40 @@
     }
 
     /**
-     * @return the locationUri
+     * Returns the URI at which the endpoint listens for new requests.
+     * 
+     * @return a string representing the endpoint's URI
      */
     public String getLocationURI() {
         return locationURI;
     }
 
     /**
-     * @param locationURI
-     *            the locationUri to set
+     * Sets the URI at which an endpoint listens for requests.
+     * 
+     * @param locationURI a string representing the URI
+     * @org.apache.xbean.Property description="the URI at which the endpoint listens for requests"
      */
     public void setLocationURI(String locationURI) {
         this.locationURI = locationURI;
     }
 
     /**
-     * @return the timeout
+     * Returns the timeout value for an HTTP endpoint.
+     * 
+     * @return the timeout specified in milliseconds
      */
     public long getTimeout() {
         return timeout;
     }
 
     /**
-     * @param timeout
-     *            the timeout to set
+     * Specifies the timeout value for an HTTP consumer endpoint. The timeout is specified in milliseconds. The default value is 0
+     * which means that the endpoint will never timeout.
+     * 
+     * @org.apache.xbean.Property description="the timeout is specified in milliseconds. The default value is 0 which 
+     *       means that the endpoint will never timeout."
+     * @param timeout the length time, in milliseconds, to wait before timing out
      */
     public void setTimeout(long timeout) {
         this.timeout = timeout;
@@ -128,23 +138,31 @@
     }
 
     /**
-     * @param marshaler
-     *            the marshaler to set
+     * Sets the class used to marshal messages.
+     * 
+     * @param marshaler the marshaler to set
+     * @org.apache.xbean.Property description="the bean used to marshal HTTP messages. The default is a
+     *                            <code>DefaultHttpConsumerMarshaler</code>."
      */
     public void setMarshaler(HttpConsumerMarshaler marshaler) {
         this.marshaler = marshaler;
     }
 
     /**
-     * @return the authMethod
+     * Returns a string describing the authentication scheme being used by an endpoint.
+     * 
+     * @return a string representing the authentication method used by an endpoint
      */
     public String getAuthMethod() {
         return authMethod;
     }
 
     /**
-     * @param authMethod
-     *            the authMethod to set
+     * Specifies the authentication method used by a secure endpoint. The authentication method is a string naming the scheme used
+     * for authenticating users.
+     * 
+     * @param authMethod a string naming the authentication scheme a secure endpoint should use
+     * @org.apache.xbean.Property description="a string naming the scheme used for authenticating users"
      */
     public void setAuthMethod(String authMethod) {
         this.authMethod = authMethod;
@@ -158,23 +176,31 @@
     }
 
     /**
-     * @param ssl
-     *            the sslParameters to set
+     * Sets the properties used to configure SSL for the endpoint.
+     * 
+     * @param ssl an <code>SslParameters</code> object containing the SSL properties
+     * @org.apache.xbean.Property description="a bean containing the SSL configuration properties"
      */
     public void setSsl(SslParameters ssl) {
         this.ssl = ssl;
     }
 
     /**
-     * @return defaultMep of the endpoint
+     * Returns a URI representing the default message exachange pattern(MEP) used by an endpoint.
+     * 
+     * @return a URI representing an endpoint's default MEP
      */
     public URI getDefaultMep() {
         return defaultMep;
     }
 
     /**
-     * @param defaultMep -
-     *            defaultMep of the endpoint
+     * Sets the default message exchange pattern(MEP) for an endpoint. The default MEP is specified as a URI and the default is
+     * <code>JbiConstants.IN_OUT</code>.
+     * 
+     * @param defaultMep a URI representing the default MEP of the endpoint
+     * @org.apache.xbean.Property description="a URI representing the endpoint's default MEP. The default is
+     *                            <code>JbiConstants.IN_OUT</code>."
      */
     public void setDefaultMep(URI defaultMep) {
         this.defaultMep = defaultMep;
@@ -193,15 +219,25 @@
     }
 
     public void process(MessageExchange exchange) throws Exception {
-        Continuation cont = locks.remove(exchange.getExchangeId());
+        // Receive the exchange response
+        // First, check if the continuation has not been removed from the map,
+        // which would mean it has timed out.  If this is the case, throw an exception
+        // that will set the exchange status to ERROR.
+        Continuation cont = locks.get(exchange.getExchangeId());
         if (cont == null) {
-            throw new Exception("HTTP request has timed out");
+            throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
         }
+        // synchronized block
         synchronized (cont) {
+            if (locks.remove(exchange.getExchangeId()) == null) {
+                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
+            }
             if (logger.isDebugEnabled()) {
                 logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
             }
+            // Put the new exchange
             exchanges.put(exchange.getExchangeId(), exchange);
+            // Resume continuation
             cont.resume();
         }
     }
@@ -216,46 +252,77 @@
             if (handleStaticResource(request, response)) {
                 return;
             }
-            // Not giving a specific mutex will synchronize on the continuation itself
+            // Not giving a specific mutex will synchronize on the continuation
+            // itself
             Continuation cont = ContinuationSupport.getContinuation(request, null);
             // If the continuation is not a retry
             if (!cont.isPending()) {
+                // Create the exchange
                 exchange = createExchange(request);
-                locks.put(exchange.getExchangeId(), cont);
+                // Put the exchange in a map so that we can later retrieve it
+                // We don't put the exchange on the request directly in case the JMS flow is involved
+                // because the exchange coming back may not be the same object as the one send.
+                exchanges.put(exchange.getExchangeId(), exchange);
+                // Put the exchange id on the request to be able to retrieve the exchange later
                 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
+                // Put the continuation in a map under the exchange id key
+                locks.put(exchange.getExchangeId(), cont);
                 synchronized (cont) {
+                    // Send the exchange
                     send(exchange);
                     if (logger.isDebugEnabled()) {
                         logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
                     }
+                    // Suspend the continuation for the configured timeout
+                    // If a SelectConnector is used, the call to suspend will throw a RetryRequest exception
+                    // else, the call will block until the continuation is resumed
                     long to = this.timeout;
                     if (to == 0) {
-                        to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration()
-                                            .getConsumerProcessorSuspendTime();
+                        to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
                     }
-                    exchanges.put(exchange.getExchangeId(), exchange);
                     boolean result = cont.suspend(to);
+                    // The call has not thrown a RetryRequest, which means we don't use a SelectConnector
+                    // and we must handle the exchange in this very method call.
+                    // If result is false, the continuation has timed out.
+                    // So get the exchange (in case the object has changed) and remove it from the map
                     exchange = exchanges.remove(exchange.getExchangeId());
+                    // remove the exchange id from the request as we don't need it anymore
+                    request.removeAttribute(MessageExchange.class.getName());
+                    // If a timeout occurred, throw an exception that will be sent back to the HTTP client
+                    // Whenever the exchange comes back, the process(MessageExchange) method will thrown an
+                    // exception and the exchange will be set in an ERROR status
                     if (!result) {
+                        // Remove the continuation from the map.
+                        // This indicates the continuation has been fully processed
                         locks.remove(exchange.getExchangeId());
                         throw new Exception("Exchange timed out");
                     }
-                    request.removeAttribute(MessageExchange.class.getName());
                 }
+            // The continuation is a retry.
+            // This happens when the SelectConnector is used and in two cases:
+            //  * the continuation has been resumed because the exchange has been received
+            //  * the continuation has timed out
             } else {
-                String id = (String) request.getAttribute(MessageExchange.class.getName());
-                locks.remove(id);
-                exchange = exchanges.remove(id);
-                request.removeAttribute(MessageExchange.class.getName());
-                boolean result = cont.suspend(0);
-                // Check if this is a timeout
-                if (exchange == null) {
-                    throw new IllegalStateException("Exchange not found");
-                }
-                if (!result) {
-                    throw new Exception("Timeout");
+                synchronized (cont) {
+                    // Get the exchange id from the request
+                    String id = (String) request.getAttribute(MessageExchange.class.getName());
+                    // Remove the continuation from the map, indicating it has been processed or timed out
+                    locks.remove(id);
+                    exchange = exchanges.remove(id);
+                    request.removeAttribute(MessageExchange.class.getName());
+                    // Check if this is a timeout
+                    if (exchange == null) {
+                        throw new IllegalStateException("Exchange not found");
+                    }
+                    if (!cont.isResumed()) {
+                        Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+                        fail(exchange, e);
+                        throw e;
+                    }
                 }
             }
+            // At this point, we have received the exchange response,
+            // so process it and send back the HTTP response
             if (exchange.getStatus() == ExchangeStatus.ERROR) {
                 Exception e = exchange.getError();
                 if (e == null) {
@@ -273,11 +340,9 @@
                             sendOut(exchange, outMsg, request, response);
                         }
                     }
-                    exchange.setStatus(ExchangeStatus.DONE);
-                    send(exchange);
+                    done(exchange);
                 } catch (Exception e) {
-                    exchange.setError(e);
-                    send(exchange);
+                    fail(exchange, e);
                     throw e;
                 }
             } else if (exchange.getStatus() == ExchangeStatus.DONE) {
@@ -298,10 +363,8 @@
     /**
      * Handle static resources
      * 
-     * @param request
-     *            the http request
-     * @param response
-     *            the http response
+     * @param request the http request
+     * @param response the http response
      * @return <code>true</code> if the request has been handled
      * @throws IOException
      * @throws ServletException
@@ -333,8 +396,8 @@
             response.setStatus(200);
             response.setContentType("text/xml");
             try {
-                new SourceTransformer().toResult(new DOMSource((Node) res),
-                                new StreamResult(response.getOutputStream()));
+                new SourceTransformer().toResult(new DOMSource((Node)res), 
+                                                 new StreamResult(response.getOutputStream()));
             } catch (TransformerException e) {
                 throw new ServletException("Error while sending xml resource", e);
             }
@@ -372,23 +435,23 @@
         return me;
     }
 
-    public void sendAccepted(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response)
-        throws Exception {
+    public void sendAccepted(MessageExchange exchange, HttpServletRequest request,
+                             HttpServletResponse response) throws Exception {
         marshaler.sendAccepted(exchange, request, response);
     }
 
     public void sendError(MessageExchange exchange, Exception error, HttpServletRequest request,
-        HttpServletResponse response) throws Exception {
+                          HttpServletResponse response) throws Exception {
         marshaler.sendError(exchange, error, request, response);
     }
 
     public void sendFault(MessageExchange exchange, Fault fault, HttpServletRequest request,
-        HttpServletResponse response) throws Exception {
+                          HttpServletResponse response) throws Exception {
         marshaler.sendFault(exchange, fault, request, response);
     }
 
     public void sendOut(MessageExchange exchange, NormalizedMessage outMsg, HttpServletRequest request,
-        HttpServletResponse response) throws Exception {
+                        HttpServletResponse response) throws Exception {
         marshaler.sendOut(exchange, outMsg, request, response);
     }
 

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=694207&r1=694206&r2=694207&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Thu Sep 11 03:26:43 2008
@@ -60,10 +60,6 @@
 
 public class ConsumerProcessor extends AbstractProcessor implements ExchangeProcessor, HttpProcessor {
 
-    public static final URI IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/in-only");
-    public static final URI IN_OUT = URI.create("http://www.w3.org/2004/08/wsdl/in-out");
-    public static final URI ROBUST_IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/robust-in-only");
-    
     private static Log log = LogFactory.getLog(ConsumerProcessor.class);
 
     protected Object httpContext;
@@ -94,17 +90,19 @@
     }
     
     public void process(MessageExchange exchange) throws Exception {
-        Continuation cont = locks.remove(exchange.getExchangeId());
-        if (cont != null) {
-            synchronized (cont) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
-                }
-                exchanges.put(exchange.getExchangeId(), exchange);
-                cont.resume();
+        Continuation cont = locks.get(exchange.getExchangeId());
+        if (cont == null) {
+            throw new Exception("HTTP request has timed out");
+        }
+        synchronized (cont) {
+            if (locks.remove(exchange.getExchangeId()) == null) {
+                throw new Exception("HTTP request has timed out");
             }
-        } else {
-            throw new IllegalStateException("Exchange not found");
+            if (log.isDebugEnabled()) {
+                log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+            }
+            exchanges.put(exchange.getExchangeId(), exchange);
+            cont.resume();
         }
     }
 
@@ -137,20 +135,10 @@
         // If the continuation is not a retry
         if (!cont.isPending()) {
             try {
-                SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
-                                            request.getInputStream(), 
-                                            request.getHeader(HEADER_CONTENT_TYPE));
-                Context ctx = soapHelper.createContext(message);
-                if (request.getUserPrincipal() != null) {
-                    if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
-                        Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
-                        ctx.getInMessage().setSubject(subject);
-                    } else {
-                        ctx.getInMessage().addPrincipal(request.getUserPrincipal());
-                    }
-                }
+                Context ctx = createContext(request);
                 request.setAttribute(Context.class.getName(), ctx);
                 exchange = soapHelper.onReceive(ctx);
+                exchanges.put(exchange.getExchangeId(), exchange);
                 NormalizedMessage inMessage = exchange.getMessage("in");
                 if (getConfiguration().isWantHeadersFromHttpIntoExchange()) {
                     inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
@@ -164,11 +152,11 @@
                     }
                     boolean result = cont.suspend(suspentionTime);
                     exchange = exchanges.remove(exchange.getExchangeId());
+                    request.removeAttribute(MessageExchange.class.getName());
                     if (!result) {
                         locks.remove(exchange.getExchangeId());
-                        throw new Exception("Error sending exchange: aborted");
+                        throw new Exception("Exchange timed out");
                     }
-                    request.removeAttribute(MessageExchange.class.getName());
                 }
             } catch (RetryRequest retry) {
                 throw retry;
@@ -176,21 +164,26 @@
                 sendFault(fault, request, response);
                 return;
             } catch (Exception e) {
-                SoapFault fault = new SoapFault(e);
-                sendFault(fault, request, response);
+                sendFault(new SoapFault(e), request, response);
                 return;
             }
         } else {
-            String id = (String) request.getAttribute(MessageExchange.class.getName());
-            exchange = exchanges.remove(id);
-            request.removeAttribute(MessageExchange.class.getName());
-            boolean result = cont.suspend(0); 
-            // Check if this is a timeout
-            if (exchange == null) {
-                throw new IllegalStateException("Exchange not found");
-            }
-            if (!result) {
-                throw new Exception("Timeout");
+            synchronized (cont) {
+                String id = (String) request.getAttribute(MessageExchange.class.getName());
+                locks.remove(id);
+                exchange = exchanges.remove(id);
+                request.removeAttribute(MessageExchange.class.getName());
+                // Check if this is a timeout
+                if (exchange == null) {
+                    throw new IllegalStateException("Exchange not found");
+                }
+                if (!cont.isResumed()) {
+                    Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+                    exchange.setError(e);
+                    channel.send(exchange);
+                    sendFault(new SoapFault(e), request, response);
+                    return;
+                }
             }
         }
         if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -216,6 +209,22 @@
         }
     }
 
+    private Context createContext(HttpServletRequest request) throws Exception {
+        SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
+                                    request.getInputStream(),
+                                    request.getHeader(HEADER_CONTENT_TYPE));
+        Context ctx = soapHelper.createContext(message);
+        if (request.getUserPrincipal() != null) {
+            if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
+                Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
+                ctx.getInMessage().setSubject(subject);
+            } else {
+                ctx.getInMessage().addPrincipal(request.getUserPrincipal());
+            }
+        }
+        return ctx;
+    }
+
     private void processResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response) throws Exception {
         NormalizedMessage outMsg = exchange.getMessage("out");
         if (outMsg != null) {