You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by jb...@apache.org on 2010/02/26 13:20:23 UTC
svn commit: r916659 -
/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Author: jbonofre
Date: Fri Feb 26 12:20:23 2010
New Revision: 916659
URL: http://svn.apache.org/viewvc?rev=916659&view=rev
Log:
[SMXCOMP-713] Threading problem in Servicemix-HTTP which renders the component unusable. Thanks to Ryan Moquin.
Modified:
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=916659&r1=916658&r2=916659&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Fri Feb 26 12:20:23 2010
@@ -65,520 +65,448 @@
*/
public class HttpConsumerEndpoint extends ConsumerEndpoint implements HttpProcessor, HttpEndpointType {
- public static final String MAIN_WSDL = "main.wsdl";
-
- private String authMethod;
- private SslParameters ssl;
- private String locationURI;
- private HttpConsumerMarshaler marshaler;
- private long timeout; // 0 => default to the timeout configured on component
- private URI defaultMep = JbiConstants.IN_OUT;
-
- private Map<String, Object> resources = new HashMap<String, Object>();
- private Map<String, Continuation> locks = new ConcurrentHashMap<String, Continuation>();
- private Map<String, MessageExchange> exchanges = new ConcurrentHashMap<String, MessageExchange>();
- private Object httpContext;
-
- private boolean started = false;
- private boolean isSTFlow;
-
- public HttpConsumerEndpoint() {
- super();
- }
-
- public HttpConsumerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
- super(component, endpoint);
- }
-
- public HttpConsumerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
- super(serviceUnit, service, endpoint);
- }
-
- /**
- * Returns the URI at which the endpoint listens for new requests.
- *
- * @return a string representing the endpoint's URI
- */
- public String getLocationURI() {
- return locationURI;
- }
-
- /**
- * Sets the URI at which an endpoint listens for requests.
- *
- * @param locationURI a string representing the URI
- * @org.apache.xbean.Property description="the URI at which the endpoint listens for requests"
- */
- public void setLocationURI(String locationURI) {
- this.locationURI = locationURI;
- }
-
- /**
- * Returns the timeout value for an HTTP endpoint.
- *
- * @return the timeout specified in milliseconds
- */
- public long getTimeout() {
- return timeout;
- }
-
- /**
- * Specifies the timeout value for an HTTP consumer endpoint. The timeout is specified in milliseconds. The default value is 0
- * which means that the endpoint will never timeout.
- *
- * @org.apache.xbean.Property description="the timeout is specified in milliseconds. The default value is 0 which
- * means that the endpoint will never timeout."
- * @param timeout the length time, in milliseconds, to wait before timing out
- */
- public void setTimeout(long timeout) {
- this.timeout = timeout;
- }
-
- /**
- * @return the marshaler
- */
- public HttpConsumerMarshaler getMarshaler() {
- return marshaler;
- }
-
- /**
- * Sets the class used to marshal messages.
- *
- * @param marshaler the marshaler to set
- * @org.apache.xbean.Property description="the bean used to marshal HTTP messages. The default is a
- * <code>DefaultHttpConsumerMarshaler</code>."
- */
- public void setMarshaler(HttpConsumerMarshaler marshaler) {
- this.marshaler = marshaler;
- }
-
- /**
- * Returns a string describing the authentication scheme being used by an endpoint.
- *
- * @return a string representing the authentication method used by an endpoint
- */
- public String getAuthMethod() {
- return authMethod;
- }
-
- /**
- * Specifies the authentication method used by a secure endpoint. The authentication method is a string naming the scheme used
- * for authenticating users.
- *
- * @param authMethod a string naming the authentication scheme a secure endpoint should use
- * @org.apache.xbean.Property description="a string naming the scheme used for authenticating users"
- */
- public void setAuthMethod(String authMethod) {
- this.authMethod = authMethod;
- }
-
- /**
- * @return the sslParameters
- */
- public SslParameters getSsl() {
- return ssl;
- }
-
- /**
- * Sets the properties used to configure SSL for the endpoint.
- *
- * @param ssl an <code>SslParameters</code> object containing the SSL properties
- * @org.apache.xbean.Property description="a bean containing the SSL configuration properties"
- */
- public void setSsl(SslParameters ssl) {
- this.ssl = ssl;
- }
-
- /**
- * Returns a URI representing the default message exachange pattern(MEP) used by an endpoint.
- *
- * @return a URI representing an endpoint's default MEP
- */
- public URI getDefaultMep() {
- return defaultMep;
- }
-
- /**
- * Sets the default message exchange pattern(MEP) for an endpoint. The default MEP is specified as a URI and the default is
- * <code>JbiConstants.IN_OUT</code>.
- *
- * @param defaultMep a URI representing the default MEP of the endpoint
- * @org.apache.xbean.Property description="a URI representing the endpoint's default MEP. The default is
- * <code>JbiConstants.IN_OUT</code>."
- */
- public void setDefaultMep(URI defaultMep) {
- this.defaultMep = defaultMep;
- }
-
- public void activate() throws Exception {
- super.activate();
- loadStaticResources();
- httpContext = getServerManager().createContext(locationURI, this);
- }
-
- public void deactivate() throws Exception {
- getServerManager().remove(httpContext);
- httpContext = null;
- super.deactivate();
- }
-
- public void start() throws Exception {
- super.start();
- started = true;
- }
-
- public void stop() throws Exception {
- started = false;
- super.stop();
- }
-
- public void process(MessageExchange exchange) throws Exception {
- // Receive the exchange response
- // First, check if the continuation has not been removed from the map,
- // which would mean it has timed out. If this is the case, throw an exception
- // that will set the exchange status to ERROR.
- Continuation cont = locks.get(exchange.getExchangeId());
- if (cont == null) {
- throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
- }
- if (!cont.isPending()) {
- isSTFlow = true;
- } else {
- isSTFlow = false;
- // synchronized block
- synchronized (cont.getObject()) {
- if (locks.remove(exchange.getExchangeId()) == null) {
- throw new Exception("HTTP request has timed out for exchange: "
- + exchange.getExchangeId());
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
- }
- // Put the new exchange
- exchanges.put(exchange.getExchangeId(), exchange);
- // Resume continuation
- cont.resume();
- if (!cont.isResumed()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Could not resume continuation for exchange: "
- + exchange.getExchangeId());
- }
- exchanges.remove(exchange.getExchangeId());
- throw new Exception("HTTP request has timed out for exchange: "
- + exchange.getExchangeId());
- }
- }
- }
- }
-
- public void process(HttpServletRequest request, HttpServletResponse response) throws Exception {
+ public static final String MAIN_WSDL = "main.wsdl";
+ private String authMethod;
+ private SslParameters ssl;
+ private String locationURI;
+ private HttpConsumerMarshaler marshaler;
+ private long timeout; // 0 => default to the timeout configured on component
+ private URI defaultMep = JbiConstants.IN_OUT;
+ private Map<String, Object> resources = new HashMap<String, Object>();
+ private Map<String, Continuation> locks = new ConcurrentHashMap<String, Continuation>();
+ private Object httpContext;
+ private boolean started = false;
+
+ public HttpConsumerEndpoint() {
+ super();
+ }
+
+ public HttpConsumerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
+ super(component, endpoint);
+ }
+
+ public HttpConsumerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
+ super(serviceUnit, service, endpoint);
+ }
+
+ /**
+ * Returns the URI at which the endpoint listens for new requests.
+ *
+ * @return a string representing the endpoint's URI
+ */
+ public String getLocationURI() {
+ return locationURI;
+ }
+
+ /**
+ * Sets the URI at which an endpoint listens for requests.
+ *
+ * @param locationURI a string representing the URI
+ * @org.apache.xbean.Property description="the URI at which the endpoint listens for requests"
+ */
+ public void setLocationURI(String locationURI) {
+ this.locationURI = locationURI;
+ }
+
+ /**
+ * Returns the timeout value for an HTTP endpoint.
+ *
+ * @return the timeout specified in milliseconds
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Specifies the timeout value for an HTTP consumer endpoint. The timeout is specified in milliseconds. The default value is 0
+ * which means that the endpoint will never timeout.
+ *
+ * @org.apache.xbean.Property description="the timeout is specified in milliseconds. The default value is 0 which
+ * means that the endpoint will never timeout."
+ * @param timeout the length time, in milliseconds, to wait before timing out
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * @return the marshaler
+ */
+ public HttpConsumerMarshaler getMarshaler() {
+ return marshaler;
+ }
+
+ /**
+ * Sets the class used to marshal messages.
+ *
+ * @param marshaler the marshaler to set
+ * @org.apache.xbean.Property description="the bean used to marshal HTTP messages. The default is a
+ * <code>DefaultHttpConsumerMarshaler</code>."
+ */
+ public void setMarshaler(HttpConsumerMarshaler marshaler) {
+ this.marshaler = marshaler;
+ }
+
+ /**
+ * Returns a string describing the authentication scheme being used by an endpoint.
+ *
+ * @return a string representing the authentication method used by an endpoint
+ */
+ public String getAuthMethod() {
+ return authMethod;
+ }
+
+ /**
+ * Specifies the authentication method used by a secure endpoint. The authentication method is a string naming the scheme used
+ * for authenticating users.
+ *
+ * @param authMethod a string naming the authentication scheme a secure endpoint should use
+ * @org.apache.xbean.Property description="a string naming the scheme used for authenticating users"
+ */
+ public void setAuthMethod(String authMethod) {
+ this.authMethod = authMethod;
+ }
+
+ /**
+ * @return the sslParameters
+ */
+ public SslParameters getSsl() {
+ return ssl;
+ }
+
+ /**
+ * Sets the properties used to configure SSL for the endpoint.
+ *
+ * @param ssl an <code>SslParameters</code> object containing the SSL properties
+ * @org.apache.xbean.Property description="a bean containing the SSL configuration properties"
+ */
+ public void setSsl(SslParameters ssl) {
+ this.ssl = ssl;
+ }
+
+ /**
+ * Returns a URI representing the default message exachange pattern(MEP) used by an endpoint.
+ *
+ * @return a URI representing an endpoint's default MEP
+ */
+ public URI getDefaultMep() {
+ return defaultMep;
+ }
+
+ /**
+ * Sets the default message exchange pattern(MEP) for an endpoint. The default MEP is specified as a URI and the default is
+ * <code>JbiConstants.IN_OUT</code>.
+ *
+ * @param defaultMep a URI representing the default MEP of the endpoint
+ * @org.apache.xbean.Property description="a URI representing the endpoint's default MEP. The default is
+ * <code>JbiConstants.IN_OUT</code>."
+ */
+ public void setDefaultMep(URI defaultMep) {
+ this.defaultMep = defaultMep;
+ }
+
+ public void activate() throws Exception {
+ super.activate();
+ loadStaticResources();
+ httpContext = getServerManager().createContext(locationURI, this);
+ }
+
+ public void deactivate() throws Exception {
+ getServerManager().remove(httpContext);
+ httpContext = null;
+ super.deactivate();
+ }
+
+ public void start() throws Exception {
+ super.start();
+ started = true;
+ }
+
+ public void stop() throws Exception {
+ started = false;
+ super.stop();
+ }
+
+ public void process(MessageExchange exchange) throws Exception {
+ // Receive the exchange response
+ // First, check if the continuation has not been removed from the map,
+ // which would mean it has timed out. If this is the case, throw an exception
+ // that will set the exchange status to ERROR.
+ Continuation cont = locks.get(exchange.getExchangeId());
+ synchronized (cont) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+ }
+ // Resume continuation
+ cont.resume();
+ }
+ }
+
+ public void process(HttpServletRequest request, HttpServletResponse response) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Receiving HTTP request: " + request);
+ }
+ MessageExchange exchange = null;
+ try {
+ // Handle WSDLs, XSDs
+ if (handleStaticResource(request, response)) {
+ return;
+ }
+ Continuation cont = createContinuation(request);
+ // If the continuation is not a retry
+ if (!cont.isPending()) {
+ // Check endpoint is started
+ if (!started) {
+ response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint is stopped");
+ return;
+ }
+ // Create the exchange
+ exchange = createExchange(request);
+ // Put the exchange into the continuation for retrieval later.
+ cont.setObject(exchange);
+ // Put the continuation in a map under the exchange id key
+ locks.put(exchange.getExchangeId(), cont);
if (logger.isDebugEnabled()) {
- logger.debug("Receiving HTTP request: " + request);
+ logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
}
- MessageExchange exchange = null;
+ long to = this.timeout;
+ if (to == 0) {
+ to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
+ }
+ synchronized (cont) {
+ // Send the exchange and then suspend the request.
+ send(exchange);
+ // Suspend the continuation for the configured timeout
+ // If a SelectConnector is used, the call to suspend
+ // will throw a RetryRequest exception
+ // else, the call will block until the continuation is
+ // resumed
+ cont.suspend(to);
+ // The call has not thrown a RetryRequest, which means
+ // we don't use a SelectConnector
+ // and we must handle the exchange in this very method
+ // call.
+ // If result is false, the continuation has timed out.
+ locks.remove(exchange.getExchangeId());
+ }
+ } else {
+ // The continuation is a retry.
+ // This happens when the SelectConnector is used and in two cases:
+ // * the continuation has been resumed because the exchange has been received
+ // * the continuation has timed out
+ exchange = (MessageExchange) cont.getObject();
+ // Remove the continuation from the map, indicating it has been processed or timed out
+ locks.remove(exchange.getExchangeId());
+ // Check if this is a timeout
+ //if (exchange == null) {
+ // throw new IllegalStateException("Exchange not found");
+ //}
+ }
+ // At this point, we have received the exchange response,
+ // so process it and send back the HTTP response
+ if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ Exception e = exchange.getError();
+ if (e == null) {
+ e = new Exception("Unkown error (exchange aborted ?)");
+ }
+ throw e;
+ } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
try {
- // Handle WSDLs, XSDs
- if (handleStaticResource(request, response)) {
- return;
- }
- Continuation cont = createContinuation(request);
- // If the continuation is not a retry
- if (!cont.isPending()) {
- // Check endpoint is started
- if (!started) {
- response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint is stopped");
- return;
- }
- // Create the exchange
- exchange = createExchange(request);
- // Put the exchange in a map so that we can later retrieve it
- // We don't put the exchange on the request directly in case the JMS flow is involved
- // because the exchange coming back may not be the same object as the one send.
- exchanges.put(exchange.getExchangeId(), exchange);
- // Put the exchange id on the request to be able to retrieve the exchange later
- request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
- // Put the continuation in a map under the exchange id key
- locks.put(exchange.getExchangeId(), cont);
- synchronized (cont.getObject()) {
- // Send the exchange
- send(exchange);
- if (!isSTFlow) {
- if (logger.isDebugEnabled()) {
- logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
- }
- // Suspend the continuation for the configured timeout
- // If a SelectConnector is used, the call to suspend
- // will throw a RetryRequest exception
- // else, the call will block until the continuation is
- // resumed
- long to = this.timeout;
- if (to == 0) {
- to = ((HttpComponent)getServiceUnit().getComponent()).getConfiguration()
- .getConsumerProcessorSuspendTime();
- }
- boolean result = cont.suspend(to);
- // The call has not thrown a RetryRequest, which means
- // we don't use a SelectConnector
- // and we must handle the exchange in this very method
- // call.
- // If result is false, the continuation has timed out.
- // So get the exchange (in case the object has changed)
- // and remove it from the map
- exchange = exchanges.remove(exchange.getExchangeId());
- // remove the exchange id from the request as we don't
- // need it anymore
- request.removeAttribute(MessageExchange.class.getName());
- // If a timeout occurred, throw an exception that will
- // be sent back to the HTTP client
- // Whenever the exchange comes back, the
- // process(MessageExchange) method will thrown an
- // exception and the exchange will be set in an ERROR
- // status
- if (!result) {
- // Remove the continuation from the map.
- // This indicates the continuation has been fully
- // processed
- locks.remove(exchange.getExchangeId());
- throw new Exception("Exchange timed out");
- }
- } else {
- String id = (String)request.getAttribute(MessageExchange.class.getName());
- locks.remove(id);
- exchange = exchanges.remove(id);
- request.removeAttribute(MessageExchange.class.getName());
- }
- }
- // The continuation is a retry.
- // This happens when the SelectConnector is used and in two cases:
- // * the continuation has been resumed because the exchange has been received
- // * the continuation has timed out
- } else {
- synchronized (cont.getObject()) {
- // Get the exchange id from the request
- String id = (String) request.getAttribute(MessageExchange.class.getName());
- // Remove the continuation from the map, indicating it has been processed or timed out
- locks.remove(id);
- exchange = exchanges.remove(id);
- request.removeAttribute(MessageExchange.class.getName());
- // Check if this is a timeout
- if (exchange == null) {
- throw new IllegalStateException("Exchange not found");
- }
- if (!cont.isResumed()) {
- // When the exchange comes back later, the continuation will not be found and
- // the exchange will be set in an ERROR state by the process(MessageExchange) method
- throw new Exception("Exchange timed out: " + exchange.getExchangeId());
- }
- }
+ Fault fault = exchange.getFault();
+ if (fault != null) {
+ sendFault(exchange, fault, request, response);
+ } else {
+ NormalizedMessage outMsg = exchange.getMessage("out");
+ if (outMsg != null) {
+ sendOut(exchange, outMsg, request, response);
}
- // At this point, we have received the exchange response,
- // so process it and send back the HTTP response
- if (exchange.getStatus() == ExchangeStatus.ERROR) {
- Exception e = exchange.getError();
- if (e == null) {
- e = new Exception("Unknown error (exchange aborted ?)");
- }
- throw e;
- } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- try {
- Fault fault = exchange.getFault();
- if (fault != null) {
- sendFault(exchange, fault, request, response);
- } else {
- NormalizedMessage outMsg = exchange.getMessage("out");
- if (outMsg != null) {
- sendOut(exchange, outMsg, request, response);
- }
- }
- done(exchange);
- } catch (Exception e) {
- fail(exchange, e);
- throw e;
- }
- } else if (exchange.getStatus() == ExchangeStatus.DONE) {
- // This happens when there is no response to send back
- sendAccepted(exchange, request, response);
- }
- } catch (RetryRequest e) {
- throw e;
+ }
+ done(exchange);
} catch (Exception e) {
- sendError(exchange, e, request, response);
- }
- }
-
- private Continuation createContinuation(HttpServletRequest request) {
- // not giving a specific mutex will synchronize on the continuation itself
- Continuation continuation = ContinuationSupport.getContinuation(request, null);
- // Set the continuation's object that the endpoint will use to synchronize on to avoid a
- // deadlock between this endpoint and the Jetty continuation timeout mechanism
- // the endpoint now synchronizes on the continuation's object while Jetty synchronizes on
- // the continuation itself
- synchronized (continuation) {
- if (continuation.getObject() == null) {
- continuation.setObject(new Object());
- }
- }
- return continuation;
- }
-
- protected void loadStaticResources() throws Exception {
- }
-
- /**
- * Handle static resources
- *
- * @param request the http request
- * @param response the http response
- * @return <code>true</code> if the request has been handled
- * @throws IOException
- * @throws ServletException
- */
- protected boolean handleStaticResource(HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- if (!"GET".equals(request.getMethod())) {
- return false;
- }
- String query = request.getQueryString();
- if (query != null && query.trim().equalsIgnoreCase("wsdl") && getResource(MAIN_WSDL) != null) {
- String uri = request.getRequestURI();
- if (!uri.endsWith("/")) {
- uri += "/";
- }
- uri += MAIN_WSDL;
- response.sendRedirect(uri);
- return true;
- }
- String path = request.getPathInfo();
- if (path.lastIndexOf('/') >= 0) {
- path = path.substring(path.lastIndexOf('/') + 1);
- }
- Object res = getResource(path);
- if (res == null) {
- return false;
- }
- if (res instanceof Node) {
- response.setStatus(200);
- response.setContentType("text/xml");
- try {
- new SourceTransformer().toResult(new DOMSource((Node)res),
- new StreamResult(response.getOutputStream()));
- } catch (TransformerException e) {
- throw new ServletException("Error while sending xml resource", e);
- }
- } else if (res != null) {
- // TODO: handle other static resources ...
- throw new ServletException("Unable to serialize resource");
- } else {
- return false;
- }
- return true;
- }
-
- protected Object getResource(String path) {
- return resources.get(path);
- }
-
- protected void addResource(String path, Object resource) {
- resources.put(path, resource);
- }
-
- protected ContextManager getServerManager() {
- HttpComponent comp = (HttpComponent) getServiceUnit().getComponent();
- return comp.getServer();
- }
-
- public MessageExchange createExchange(HttpServletRequest request) throws Exception {
- MessageExchange me = marshaler.createExchange(request, getContext());
- if (me.getEndpoint() == null) {
- configureExchangeTarget(me);
- }
- // If the user has been authenticated, put these informations on
- // the in NormalizedMessage.
- if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
- Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
- me.getMessage("in").setSecuritySubject(subject);
- }
- return me;
- }
-
- public void sendAccepted(MessageExchange exchange, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
- marshaler.sendAccepted(exchange, request, response);
- }
-
- public void sendError(MessageExchange exchange, Exception error, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
- marshaler.sendError(exchange, error, request, response);
- }
-
- public void sendFault(MessageExchange exchange, Fault fault, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
- marshaler.sendFault(exchange, fault, request, response);
- }
-
- public void sendOut(MessageExchange exchange, NormalizedMessage outMsg, HttpServletRequest request,
- HttpServletResponse response) throws Exception {
- marshaler.sendOut(exchange, outMsg, request, response);
- }
-
- public void validate() throws DeploymentException {
- super.validate();
- if (locationURI == null || locationURI.trim().length() < 1) {
- throw new DeploymentException("The location URI is mandatory.");
- }
- if (endpoint != null && endpoint.contains(":")) {
- throw new DeploymentException("Endpoint name contains ':'. This character is not allowed as it can provide invalid WSDL.");
- }
- if (marshaler == null) {
- marshaler = new DefaultHttpConsumerMarshaler();
- }
- if (marshaler instanceof DefaultHttpConsumerMarshaler) {
- ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
+ fail(exchange, e);
+ throw e;
}
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // This happens when there is no response to send back
+ sendAccepted(exchange, request, response);
+ }
+ } catch (RetryRequest e) {
+ throw e;
+ } catch (Exception e) {
+ sendError(exchange, e, request, response);
+ }
+ }
+
+ private Continuation createContinuation(HttpServletRequest request) {
+ // not giving a specific mutex will synchronize on the continuation itself
+ Continuation continuation = ContinuationSupport.getContinuation(request, null);
+ if (continuation instanceof WaitingContinuation) {
+ return continuation;
+ } else {
+ // wrap the continuation to avoid a deadlock between this endpoint and the Jetty continuation timeout mechanism
+ // the endpoint now synchronizes on the wrapper while Jetty synchronizes on the continuation itself
+ return new ContinuationWrapper(continuation);
+ }
+ }
+
+ protected void loadStaticResources() throws Exception {
+ }
+
+ /**
+ * Handle static resources
+ *
+ * @param request the http request
+ * @param response the http response
+ * @return <code>true</code> if the request has been handled
+ * @throws IOException
+ * @throws ServletException
+ */
+ protected boolean handleStaticResource(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ if (!"GET".equals(request.getMethod())) {
+ return false;
+ }
+ String query = request.getQueryString();
+ if (query != null && query.trim().equalsIgnoreCase("wsdl") && getResource(MAIN_WSDL) != null) {
+ String uri = request.getRequestURI();
+ if (!uri.endsWith("/")) {
+ uri += "/";
+ }
+ uri += MAIN_WSDL;
+ response.sendRedirect(uri);
+ return true;
+ }
+ String path = request.getPathInfo();
+ if (path.lastIndexOf('/') >= 0) {
+ path = path.substring(path.lastIndexOf('/') + 1);
+ }
+ Object res = getResource(path);
+ if (res == null) {
+ return false;
+ }
+ if (res instanceof Node) {
+ response.setStatus(200);
+ response.setContentType("text/xml");
+ try {
+ new SourceTransformer().toResult(new DOMSource((Node) res),
+ new StreamResult(response.getOutputStream()));
+ } catch (TransformerException e) {
+ throw new ServletException("Error while sending xml resource", e);
+ }
+ } else if (res != null) {
+ // TODO: handle other static resources ...
+ throw new ServletException("Unable to serialize resource");
+ } else {
+ return false;
+ }
+ return true;
+ }
+
+ protected Object getResource(String path) {
+ return resources.get(path);
+ }
+
+ protected void addResource(String path, Object resource) {
+ resources.put(path, resource);
+ }
+
+ protected ContextManager getServerManager() {
+ HttpComponent comp = (HttpComponent) getServiceUnit().getComponent();
+ return comp.getServer();
+ }
+
+ public MessageExchange createExchange(HttpServletRequest request) throws Exception {
+ MessageExchange me = marshaler.createExchange(request, getContext());
+ if (me.getEndpoint() == null) {
+ configureExchangeTarget(me);
+ }
+ // If the user has been authenticated, put these informations on
+ // the in NormalizedMessage.
+ if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
+ Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
+ me.getMessage("in").setSecuritySubject(subject);
+ }
+ return me;
+ }
+
+ public void sendAccepted(MessageExchange exchange, HttpServletRequest request,
+ HttpServletResponse response) throws Exception {
+ marshaler.sendAccepted(exchange, request, response);
+ }
+
+ public void sendError(MessageExchange exchange, Exception error, HttpServletRequest request,
+ HttpServletResponse response) throws Exception {
+ marshaler.sendError(exchange, error, request, response);
+ }
+
+ public void sendFault(MessageExchange exchange, Fault fault, HttpServletRequest request,
+ HttpServletResponse response) throws Exception {
+ marshaler.sendFault(exchange, fault, request, response);
+ }
+
+ public void sendOut(MessageExchange exchange, NormalizedMessage outMsg, HttpServletRequest request,
+ HttpServletResponse response) throws Exception {
+ marshaler.sendOut(exchange, outMsg, request, response);
+ }
+
+ public void validate() throws DeploymentException {
+ super.validate();
+ if (locationURI == null || locationURI.trim().length() < 1) {
+ throw new DeploymentException("The location URI is mandatory.");
+ }
+ if (endpoint != null && endpoint.contains(":")) {
+ throw new DeploymentException("Endpoint name contains ':'. This character is not allowed as it can provide invalid WSDL.");
+ }
+ if (marshaler == null) {
+ marshaler = new DefaultHttpConsumerMarshaler();
+ }
+ if (marshaler instanceof DefaultHttpConsumerMarshaler) {
+ ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
+ }
+ }
+
+ /*
+ * Continuation wrapper just delegates everything to the underlying Continuation
+ */
+ private static final class ContinuationWrapper implements Continuation {
+
+ private final Continuation continuation;
+
+ private ContinuationWrapper(Continuation continuation) {
+ super();
+ this.continuation = continuation;
+ }
+
+ public Object getObject() {
+ return continuation.getObject();
+ }
+
+ public boolean isNew() {
+ return continuation.isNew();
+ }
+
+ public boolean isPending() {
+ return continuation.isPending();
+ }
+
+ public boolean isResumed() {
+ return continuation.isResumed();
+ }
+
+ public void reset() {
+ continuation.reset();
+ }
+
+ public void resume() {
+ continuation.resume();
+ }
+
+ public void setObject(Object o) {
+ continuation.setObject(o);
}
-
- /*
- * Continuation wrapper just delegates everything to the underlying Continuation
- */
- private static final class ContinuationWrapper implements Continuation {
-
- private final Continuation continuation;
-
- private ContinuationWrapper(Continuation continuation) {
- super();
- this.continuation = continuation;
- }
-
- public Object getObject() {
- return continuation.getObject();
- }
-
- public boolean isNew() {
- return continuation.isNew();
- }
-
- public boolean isPending() {
- return continuation.isPending();
- }
-
- public boolean isResumed() {
- return continuation.isResumed();
- }
-
- public void reset() {
- continuation.reset();
- }
-
- public void resume() {
- continuation.resume();
- }
-
- public void setObject(Object o) {
- continuation.setObject(o);
- }
- public boolean suspend(long timeout) {
- return continuation.suspend(timeout);
- }
+ public boolean suspend(long timeout) {
+ return continuation.suspend(timeout);
}
-}
+ }
+}
\ No newline at end of file