You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/08 18:16:21 UTC
svn commit: r376001 -
/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Author: gnodet
Date: Wed Feb 8 09:16:18 2006
New Revision: 376001
URL: http://svn.apache.org/viewcvs?rev=376001&view=rev
Log:
Fix continuations on jetty 6
Modified:
incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=376001&r1=376000&r2=376001&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Wed Feb 8 09:16:18 2006
@@ -46,6 +46,7 @@
import org.mortbay.jetty.handler.ContextHandler;
import org.mortbay.util.ajax.Continuation;
import org.mortbay.util.ajax.ContinuationSupport;
+import org.mortbay.util.ajax.WaitingContinuation;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@@ -76,13 +77,18 @@
public void process(MessageExchange exchange) throws Exception {
Continuation cont = (Continuation) locks.remove(exchange.getExchangeId());
if (cont != null) {
- cont.resume();
+ synchronized (cont) {
+ if (cont.isPending()) {
+ cont.resume();
+ }
+ }
} else {
throw new IllegalStateException("Exchange not found");
}
}
public void start() throws Exception {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
String url = endpoint.getLocationURI();
httpContext = getServerManager().createContext(url, this);
httpContext.start();
@@ -101,26 +107,36 @@
}
QName envelopeName = null;
try {
- Continuation cont = ContinuationSupport.getContinuation(request, this);
- MessageExchange exchange;
- if (cont.isNew()) {
- SoapMessage message = soapMarshaler.createReader().read(request.getInputStream(),
- request.getHeader("Content-Type"));
- exchange = soapHelper.createExchange(message);
- NormalizedMessage inMessage = exchange.getMessage("in");
- inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
- locks.put(exchange.getExchangeId(), cont);
- request.setAttribute(MessageExchange.class.getName(), exchange);
- ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, this);
- // TODO: make this timeout configurable
- boolean result = cont.suspend(1000 * 60); // 60 s
- // TODO: inconsitency between javadoc and implementation
- // the WaitingContinuation returns true if it has timed out
- if (result) {
- throw new Exception("Error sending exchange: aborted");
+ // Not giving a specific mutex will synchronize on the contination itself
+ Continuation cont = ContinuationSupport.getContinuation(request, null);
+ // TODO: Bug in jetty: the mutex is not set
+ if (cont instanceof WaitingContinuation && cont.isNew()) {
+ ((WaitingContinuation) cont).setMutex(cont);
+ }
+ MessageExchange exchange;
+ // Need to synchronize on the continuation so that the Continuation.resume
+ // can not be called before the suspend
+ // It should work without that, but it does not :(
+ synchronized (cont) {
+ // If the continuation is not a retry
+ if (!cont.isPending()) {
+ SoapMessage message = soapMarshaler.createReader().read(request.getInputStream(),
+ request.getHeader("Content-Type"));
+ exchange = soapHelper.createExchange(message);
+ NormalizedMessage inMessage = exchange.getMessage("in");
+ inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
+ locks.put(exchange.getExchangeId(), cont);
+ request.setAttribute(MessageExchange.class.getName(), exchange);
+ ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, this);
+ // TODO: make this timeout configurable
+ boolean result = cont.suspend(1000 * 60); // 60 s
+ if (!result) {
+ throw new Exception("Error sending exchange: aborted");
+ }
+ } else {
+ exchange = (MessageExchange) request.getAttribute(MessageExchange.class.getName());
+ cont.suspend(0);
}
- } else {
- exchange = (MessageExchange) request.getAttribute(MessageExchange.class.getName());
}
if (exchange.getStatus() == ExchangeStatus.ERROR) {
exchange.setStatus(ExchangeStatus.DONE);