You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/11 12:26:44 UTC
svn commit: r694207 - in
/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http:
endpoints/HttpConsumerEndpoint.java processors/ConsumerProcessor.java
Author: gnodet
Date: Thu Sep 11 03:26:43 2008
New Revision: 694207
URL: http://svn.apache.org/viewvc?rev=694207&view=rev
Log:
SM-1407: Fix problem with some exchanges not sent back in ERROR when a timeout occur
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=694207&r1=694206&r2=694207&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Thu Sep 11 03:26:43 2008
@@ -91,30 +91,40 @@
}
/**
- * @return the locationUri
+ * Returns the URI at which the endpoint listens for new requests.
+ *
+ * @return a string representing the endpoint's URI
*/
public String getLocationURI() {
return locationURI;
}
/**
- * @param locationURI
- * the locationUri to set
+ * Sets the URI at which an endpoint listens for requests.
+ *
+ * @param locationURI a string representing the URI
+ * @org.apache.xbean.Property description="the URI at which the endpoint listens for requests"
*/
public void setLocationURI(String locationURI) {
this.locationURI = locationURI;
}
/**
- * @return the timeout
+ * Returns the timeout value for an HTTP endpoint.
+ *
+ * @return the timeout specified in milliseconds
*/
public long getTimeout() {
return timeout;
}
/**
- * @param timeout
- * the timeout to set
+ * Specifies the timeout value for an HTTP consumer endpoint. The timeout is specified in milliseconds. The default value is 0
+ * which means that the endpoint will never timeout.
+ *
+ * @org.apache.xbean.Property description="the timeout is specified in milliseconds. The default value is 0 which
+ * means that the endpoint will never timeout."
+ * @param timeout the length time, in milliseconds, to wait before timing out
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
@@ -128,23 +138,31 @@
}
/**
- * @param marshaler
- * the marshaler to set
+ * Sets the class used to marshal messages.
+ *
+ * @param marshaler the marshaler to set
+ * @org.apache.xbean.Property description="the bean used to marshal HTTP messages. The default is a
+ * <code>DefaultHttpConsumerMarshaler</code>."
*/
public void setMarshaler(HttpConsumerMarshaler marshaler) {
this.marshaler = marshaler;
}
/**
- * @return the authMethod
+ * Returns a string describing the authentication scheme being used by an endpoint.
+ *
+ * @return a string representing the authentication method used by an endpoint
*/
public String getAuthMethod() {
return authMethod;
}
/**
- * @param authMethod
- * the authMethod to set
+ * Specifies the authentication method used by a secure endpoint. The authentication method is a string naming the scheme used
+ * for authenticating users.
+ *
+ * @param authMethod a string naming the authentication scheme a secure endpoint should use
+ * @org.apache.xbean.Property description="a string naming the scheme used for authenticating users"
*/
public void setAuthMethod(String authMethod) {
this.authMethod = authMethod;
@@ -158,23 +176,31 @@
}
/**
- * @param ssl
- * the sslParameters to set
+ * Sets the properties used to configure SSL for the endpoint.
+ *
+ * @param ssl an <code>SslParameters</code> object containing the SSL properties
+ * @org.apache.xbean.Property description="a bean containing the SSL configuration properties"
*/
public void setSsl(SslParameters ssl) {
this.ssl = ssl;
}
/**
- * @return defaultMep of the endpoint
+ * Returns a URI representing the default message exachange pattern(MEP) used by an endpoint.
+ *
+ * @return a URI representing an endpoint's default MEP
*/
public URI getDefaultMep() {
return defaultMep;
}
/**
- * @param defaultMep -
- * defaultMep of the endpoint
+ * Sets the default message exchange pattern(MEP) for an endpoint. The default MEP is specified as a URI and the default is
+ * <code>JbiConstants.IN_OUT</code>.
+ *
+ * @param defaultMep a URI representing the default MEP of the endpoint
+ * @org.apache.xbean.Property description="a URI representing the endpoint's default MEP. The default is
+ * <code>JbiConstants.IN_OUT</code>."
*/
public void setDefaultMep(URI defaultMep) {
this.defaultMep = defaultMep;
@@ -193,15 +219,25 @@
}
public void process(MessageExchange exchange) throws Exception {
- Continuation cont = locks.remove(exchange.getExchangeId());
+ // Receive the exchange response
+ // First, check if the continuation has not been removed from the map,
+ // which would mean it has timed out. If this is the case, throw an exception
+ // that will set the exchange status to ERROR.
+ Continuation cont = locks.get(exchange.getExchangeId());
if (cont == null) {
- throw new Exception("HTTP request has timed out");
+ throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
+ // synchronized block
synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
+ }
if (logger.isDebugEnabled()) {
logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
}
+ // Put the new exchange
exchanges.put(exchange.getExchangeId(), exchange);
+ // Resume continuation
cont.resume();
}
}
@@ -216,46 +252,77 @@
if (handleStaticResource(request, response)) {
return;
}
- // Not giving a specific mutex will synchronize on the continuation itself
+ // Not giving a specific mutex will synchronize on the continuation
+ // itself
Continuation cont = ContinuationSupport.getContinuation(request, null);
// If the continuation is not a retry
if (!cont.isPending()) {
+ // Create the exchange
exchange = createExchange(request);
- locks.put(exchange.getExchangeId(), cont);
+ // Put the exchange in a map so that we can later retrieve it
+ // We don't put the exchange on the request directly in case the JMS flow is involved
+ // because the exchange coming back may not be the same object as the one send.
+ exchanges.put(exchange.getExchangeId(), exchange);
+ // Put the exchange id on the request to be able to retrieve the exchange later
request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
+ // Put the continuation in a map under the exchange id key
+ locks.put(exchange.getExchangeId(), cont);
synchronized (cont) {
+ // Send the exchange
send(exchange);
if (logger.isDebugEnabled()) {
logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
}
+ // Suspend the continuation for the configured timeout
+ // If a SelectConnector is used, the call to suspend will throw a RetryRequest exception
+ // else, the call will block until the continuation is resumed
long to = this.timeout;
if (to == 0) {
- to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration()
- .getConsumerProcessorSuspendTime();
+ to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
}
- exchanges.put(exchange.getExchangeId(), exchange);
boolean result = cont.suspend(to);
+ // The call has not thrown a RetryRequest, which means we don't use a SelectConnector
+ // and we must handle the exchange in this very method call.
+ // If result is false, the continuation has timed out.
+ // So get the exchange (in case the object has changed) and remove it from the map
exchange = exchanges.remove(exchange.getExchangeId());
+ // remove the exchange id from the request as we don't need it anymore
+ request.removeAttribute(MessageExchange.class.getName());
+ // If a timeout occurred, throw an exception that will be sent back to the HTTP client
+ // Whenever the exchange comes back, the process(MessageExchange) method will thrown an
+ // exception and the exchange will be set in an ERROR status
if (!result) {
+ // Remove the continuation from the map.
+ // This indicates the continuation has been fully processed
locks.remove(exchange.getExchangeId());
throw new Exception("Exchange timed out");
}
- request.removeAttribute(MessageExchange.class.getName());
}
+ // The continuation is a retry.
+ // This happens when the SelectConnector is used and in two cases:
+ // * the continuation has been resumed because the exchange has been received
+ // * the continuation has timed out
} else {
- String id = (String) request.getAttribute(MessageExchange.class.getName());
- locks.remove(id);
- exchange = exchanges.remove(id);
- request.removeAttribute(MessageExchange.class.getName());
- boolean result = cont.suspend(0);
- // Check if this is a timeout
- if (exchange == null) {
- throw new IllegalStateException("Exchange not found");
- }
- if (!result) {
- throw new Exception("Timeout");
+ synchronized (cont) {
+ // Get the exchange id from the request
+ String id = (String) request.getAttribute(MessageExchange.class.getName());
+ // Remove the continuation from the map, indicating it has been processed or timed out
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+ request.removeAttribute(MessageExchange.class.getName());
+ // Check if this is a timeout
+ if (exchange == null) {
+ throw new IllegalStateException("Exchange not found");
+ }
+ if (!cont.isResumed()) {
+ Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+ fail(exchange, e);
+ throw e;
+ }
}
}
+ // At this point, we have received the exchange response,
+ // so process it and send back the HTTP response
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Exception e = exchange.getError();
if (e == null) {
@@ -273,11 +340,9 @@
sendOut(exchange, outMsg, request, response);
}
}
- exchange.setStatus(ExchangeStatus.DONE);
- send(exchange);
+ done(exchange);
} catch (Exception e) {
- exchange.setError(e);
- send(exchange);
+ fail(exchange, e);
throw e;
}
} else if (exchange.getStatus() == ExchangeStatus.DONE) {
@@ -298,10 +363,8 @@
/**
* Handle static resources
*
- * @param request
- * the http request
- * @param response
- * the http response
+ * @param request the http request
+ * @param response the http response
* @return <code>true</code> if the request has been handled
* @throws IOException
* @throws ServletException
@@ -333,8 +396,8 @@
response.setStatus(200);
response.setContentType("text/xml");
try {
- new SourceTransformer().toResult(new DOMSource((Node) res),
- new StreamResult(response.getOutputStream()));
+ new SourceTransformer().toResult(new DOMSource((Node)res),
+ new StreamResult(response.getOutputStream()));
} catch (TransformerException e) {
throw new ServletException("Error while sending xml resource", e);
}
@@ -372,23 +435,23 @@
return me;
}
- public void sendAccepted(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response)
- throws Exception {
+ public void sendAccepted(MessageExchange exchange, HttpServletRequest request,
+ HttpServletResponse response) throws Exception {
marshaler.sendAccepted(exchange, request, response);
}
public void sendError(MessageExchange exchange, Exception error, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
+ HttpServletResponse response) throws Exception {
marshaler.sendError(exchange, error, request, response);
}
public void sendFault(MessageExchange exchange, Fault fault, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
+ HttpServletResponse response) throws Exception {
marshaler.sendFault(exchange, fault, request, response);
}
public void sendOut(MessageExchange exchange, NormalizedMessage outMsg, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
+ HttpServletResponse response) throws Exception {
marshaler.sendOut(exchange, outMsg, request, response);
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=694207&r1=694206&r2=694207&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Thu Sep 11 03:26:43 2008
@@ -60,10 +60,6 @@
public class ConsumerProcessor extends AbstractProcessor implements ExchangeProcessor, HttpProcessor {
- public static final URI IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/in-only");
- public static final URI IN_OUT = URI.create("http://www.w3.org/2004/08/wsdl/in-out");
- public static final URI ROBUST_IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/robust-in-only");
-
private static Log log = LogFactory.getLog(ConsumerProcessor.class);
protected Object httpContext;
@@ -94,17 +90,19 @@
}
public void process(MessageExchange exchange) throws Exception {
- Continuation cont = locks.remove(exchange.getExchangeId());
- if (cont != null) {
- synchronized (cont) {
- if (log.isDebugEnabled()) {
- log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
- }
- exchanges.put(exchange.getExchangeId(), exchange);
- cont.resume();
+ Continuation cont = locks.get(exchange.getExchangeId());
+ if (cont == null) {
+ throw new Exception("HTTP request has timed out");
+ }
+ synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception("HTTP request has timed out");
}
- } else {
- throw new IllegalStateException("Exchange not found");
+ if (log.isDebugEnabled()) {
+ log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+ }
+ exchanges.put(exchange.getExchangeId(), exchange);
+ cont.resume();
}
}
@@ -137,20 +135,10 @@
// If the continuation is not a retry
if (!cont.isPending()) {
try {
- SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
- request.getInputStream(),
- request.getHeader(HEADER_CONTENT_TYPE));
- Context ctx = soapHelper.createContext(message);
- if (request.getUserPrincipal() != null) {
- if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
- Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
- ctx.getInMessage().setSubject(subject);
- } else {
- ctx.getInMessage().addPrincipal(request.getUserPrincipal());
- }
- }
+ Context ctx = createContext(request);
request.setAttribute(Context.class.getName(), ctx);
exchange = soapHelper.onReceive(ctx);
+ exchanges.put(exchange.getExchangeId(), exchange);
NormalizedMessage inMessage = exchange.getMessage("in");
if (getConfiguration().isWantHeadersFromHttpIntoExchange()) {
inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
@@ -164,11 +152,11 @@
}
boolean result = cont.suspend(suspentionTime);
exchange = exchanges.remove(exchange.getExchangeId());
+ request.removeAttribute(MessageExchange.class.getName());
if (!result) {
locks.remove(exchange.getExchangeId());
- throw new Exception("Error sending exchange: aborted");
+ throw new Exception("Exchange timed out");
}
- request.removeAttribute(MessageExchange.class.getName());
}
} catch (RetryRequest retry) {
throw retry;
@@ -176,21 +164,26 @@
sendFault(fault, request, response);
return;
} catch (Exception e) {
- SoapFault fault = new SoapFault(e);
- sendFault(fault, request, response);
+ sendFault(new SoapFault(e), request, response);
return;
}
} else {
- String id = (String) request.getAttribute(MessageExchange.class.getName());
- exchange = exchanges.remove(id);
- request.removeAttribute(MessageExchange.class.getName());
- boolean result = cont.suspend(0);
- // Check if this is a timeout
- if (exchange == null) {
- throw new IllegalStateException("Exchange not found");
- }
- if (!result) {
- throw new Exception("Timeout");
+ synchronized (cont) {
+ String id = (String) request.getAttribute(MessageExchange.class.getName());
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+ request.removeAttribute(MessageExchange.class.getName());
+ // Check if this is a timeout
+ if (exchange == null) {
+ throw new IllegalStateException("Exchange not found");
+ }
+ if (!cont.isResumed()) {
+ Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+ exchange.setError(e);
+ channel.send(exchange);
+ sendFault(new SoapFault(e), request, response);
+ return;
+ }
}
}
if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -216,6 +209,22 @@
}
}
+ private Context createContext(HttpServletRequest request) throws Exception {
+ SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
+ request.getInputStream(),
+ request.getHeader(HEADER_CONTENT_TYPE));
+ Context ctx = soapHelper.createContext(message);
+ if (request.getUserPrincipal() != null) {
+ if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
+ Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
+ ctx.getInMessage().setSubject(subject);
+ } else {
+ ctx.getInMessage().addPrincipal(request.getUserPrincipal());
+ }
+ }
+ return ctx;
+ }
+
private void processResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response) throws Exception {
NormalizedMessage outMsg = exchange.getMessage("out");
if (outMsg != null) {