You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/12/22 13:24:05 UTC

svn commit: r728661 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

Author: gertv
Date: Mon Dec 22 04:24:05 2008
New Revision: 728661

URL: http://svn.apache.org/viewvc?rev=728661&view=rev
Log:
SM-1710: Running servicemix-http unit tests stalls intermittently

Modified:
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=728661&r1=728660&r2=728661&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Mon Dec 22 04:24:05 2008
@@ -53,6 +53,7 @@
 import org.mortbay.jetty.RetryRequest;
 import org.mortbay.util.ajax.Continuation;
 import org.mortbay.util.ajax.ContinuationSupport;
+import org.mortbay.util.ajax.WaitingContinuation;
 
 /**
  * Plain HTTP consumer endpoint. This endpoint can be used to handle plain HTTP request (without SOAP) or to be able to
@@ -271,16 +272,14 @@
             if (handleStaticResource(request, response)) {
                 return;
             }
-            // Not giving a specific mutex will synchronize on the continuation
-            // itself
-            Continuation cont = ContinuationSupport.getContinuation(request, null);
+            Continuation cont = createContinuation(request);
             // If the continuation is not a retry
             if (!cont.isPending()) {
-	            // Check endpoint is started
-	            if (!started) {
-	                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint is stopped");
-	                return;
-	            }
+                // Check endpoint is started
+                if (!started) {
+                   response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint is stopped");
+                   return;
+                }
                 // Create the exchange
                 exchange = createExchange(request);
                 // Put the exchange in a map so that we can later retrieve it
@@ -380,6 +379,18 @@
         }
     }
 
+    private Continuation createContinuation(HttpServletRequest request) {
+        // not giving a specific mutex will synchronize on the continuation itself
+        Continuation continuation = ContinuationSupport.getContinuation(request, null);
+        if (continuation instanceof WaitingContinuation) {
+            return continuation;
+        } else {
+            // wrap the continuation to avoid a deadlock between this endpoint and the Jetty continuation timeout mechanism
+            // the endpoint now synchronizes on the wrapper while Jetty synchronizes on the continuation itself
+            return new ContinuationWrapper(continuation);
+        }
+    }
+
     protected void loadStaticResources() throws Exception {
     }
 
@@ -489,4 +500,49 @@
             ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
         }
     }
+    
+    /*
+     * Continuation wrapper just delegates everything to the underlying Continuation
+     */
+    private static final class ContinuationWrapper implements Continuation {
+        
+        private final Continuation continuation;
+
+        private ContinuationWrapper(Continuation continuation) {
+            super();
+            this.continuation = continuation;
+        }
+
+        public Object getObject() {
+            return continuation.getObject();
+        }
+
+        public boolean isNew() {
+            return continuation.isNew();
+        }
+
+        public boolean isPending() {
+            return continuation.isPending();
+        }
+
+        public boolean isResumed() {
+            return continuation.isResumed();
+        }
+
+        public void reset() {
+            continuation.reset();
+        }
+
+        public void resume() {
+            continuation.resume();
+        }
+
+        public void setObject(Object o) {
+            continuation.setObject(o);            
+        }
+
+        public boolean suspend(long timeout) {
+            return continuation.suspend(timeout);
+        }
+    }
 }