You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/08 18:16:21 UTC

svn commit: r376001 - /incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Author: gnodet
Date: Wed Feb  8 09:16:18 2006
New Revision: 376001

URL: http://svn.apache.org/viewcvs?rev=376001&view=rev
Log:
Fix continuations on jetty 6

Modified:
    incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=376001&r1=376000&r2=376001&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Wed Feb  8 09:16:18 2006
@@ -46,6 +46,7 @@
 import org.mortbay.jetty.handler.ContextHandler;
 import org.mortbay.util.ajax.Continuation;
 import org.mortbay.util.ajax.ContinuationSupport;
+import org.mortbay.util.ajax.WaitingContinuation;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
@@ -76,13 +77,18 @@
     public void process(MessageExchange exchange) throws Exception {
         Continuation cont = (Continuation) locks.remove(exchange.getExchangeId());
         if (cont != null) {
-            cont.resume();
+            synchronized (cont) {
+                if (cont.isPending()) {
+                    cont.resume();
+                }
+            }
         } else {
             throw new IllegalStateException("Exchange not found");
         }
     }
 
     public void start() throws Exception {
+        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
         String url = endpoint.getLocationURI();
         httpContext = getServerManager().createContext(url, this);
         httpContext.start();
@@ -101,26 +107,36 @@
         }
         QName envelopeName = null;
         try {
-            Continuation cont = ContinuationSupport.getContinuation(request, this);
-            MessageExchange exchange; 
-            if (cont.isNew()) {
-                SoapMessage message = soapMarshaler.createReader().read(request.getInputStream(), 
-                                                                        request.getHeader("Content-Type"));
-                exchange = soapHelper.createExchange(message);
-                NormalizedMessage inMessage = exchange.getMessage("in");
-                inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
-                locks.put(exchange.getExchangeId(), cont);
-                request.setAttribute(MessageExchange.class.getName(), exchange);
-                ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, this);
-                // TODO: make this timeout configurable
-                boolean result = cont.suspend(1000 * 60); // 60 s
-                // TODO: inconsitency between javadoc and implementation
-                // the WaitingContinuation returns true if it has timed out
-                if (result) {
-                    throw new Exception("Error sending exchange: aborted");
+            // Not giving a specific mutex will synchronize on the contination itself
+            Continuation cont = ContinuationSupport.getContinuation(request, null);
+            // TODO: Bug in jetty: the mutex is not set
+            if (cont instanceof WaitingContinuation && cont.isNew()) {
+                ((WaitingContinuation) cont).setMutex(cont);
+            }
+            MessageExchange exchange;
+            // Need to synchronize on the continuation so that the Continuation.resume
+            // can not be called before the suspend
+            // It should work without that, but it does not :(
+            synchronized (cont) {
+                // If the continuation is not a retry
+                if (!cont.isPending()) {
+                    SoapMessage message = soapMarshaler.createReader().read(request.getInputStream(), 
+                                                                            request.getHeader("Content-Type"));
+                    exchange = soapHelper.createExchange(message);
+                    NormalizedMessage inMessage = exchange.getMessage("in");
+                    inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
+                    locks.put(exchange.getExchangeId(), cont);
+                    request.setAttribute(MessageExchange.class.getName(), exchange);
+                    ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, this);
+                    // TODO: make this timeout configurable
+                    boolean result = cont.suspend(1000 * 60); // 60 s
+                    if (!result) {
+                        throw new Exception("Error sending exchange: aborted");
+                    }
+                } else {
+                    exchange = (MessageExchange) request.getAttribute(MessageExchange.class.getName());
+                    cont.suspend(0);
                 }
-            } else {
-                exchange = (MessageExchange) request.getAttribute(MessageExchange.class.getName());
             }
             if (exchange.getStatus() == ExchangeStatus.ERROR) {
                 exchange.setStatus(ExchangeStatus.DONE);