You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2011/12/21 09:36:01 UTC

svn commit: r1221644 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Author: norman
Date: Wed Dec 21 08:36:01 2011
New Revision: 1221644

URL: http://svn.apache.org/viewvc?rev=1221644&view=rev
Log:
Make sure the Responses are written in the correct order even if FutureResponse's and Response's get mixed. See PROTOCOLS-62

Modified:
    james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221644&r1=1221643&r2=1221644&view=diff
==============================================================================
--- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
+++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 08:36:01 2011
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.james.protocols.api.FutureResponse.ResponseListener;
@@ -42,7 +43,8 @@ public abstract class AbstractProtocolTr
     
     // TODO: Should we limit the size ?
     private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>();
-
+    private final AtomicBoolean write = new AtomicBoolean(false);
+    
     /**
      * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
      */
@@ -64,37 +66,48 @@ public abstract class AbstractProtocolTr
      */
     private  void writeQueuedResponses(final ProtocolSession session) {
         Response queuedResponse = null;
-            
-        // dequeue Responses until non is left
-        while ((queuedResponse = responses.poll()) != null) {
-                
-            // check if we need to take special care of FutureResponses
-            if (queuedResponse instanceof FutureResponse) {
-                FutureResponse futureResponse =(FutureResponse) queuedResponse;
-                if (futureResponse.isReady()) {
-                    // future is ready so we can write it without blocking the IO-Thread
-                    writeResponseToClient(queuedResponse, session);
-                } else {
-                        
-                    // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
-                    futureResponse.addListener(new ResponseListener() {
+        
+        if (write.compareAndSet(false, true)){
+            boolean listenerAdded = false;
+            // dequeue Responses until non is left
+            while ((queuedResponse = responses.poll()) != null) {
+                    
+                // check if we need to take special care of FutureResponses
+                if (queuedResponse instanceof FutureResponse) {
+                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
+                    if (futureResponse.isReady()) {
+                        // future is ready so we can write it without blocking the IO-Thread
+                        writeResponseToClient(queuedResponse, session);
+                    } else {
                             
-                        public void onResponse(FutureResponse response) {
-                            writeResponseToClient(response, session);
-                            writeQueuedResponses(session);
-                        }
-                    });
+                        // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
+                        futureResponse.addListener(new ResponseListener() {
+                                
+                            public void onResponse(FutureResponse response) {
+                                writeResponseToClient(response, session);
+                                if (write.compareAndSet(true, false)) {
+                                    writeQueuedResponses(session);
+                                }
+                            }
+                        });
+                        listenerAdded = true;
+                        // just break here as we will trigger the dequeue later
+                        break;
+                    }
                         
-                    // just break here as we will trigger the dequeue later
-                    break;
+                } else {
+                    // the Response is not a FutureResponse, so just write it back the the remote peer
+                    writeResponseToClient(queuedResponse, session);
                 }
-                    
-            } else {
-                // the Response is not a FutureResponse, so just write it back the the remote peer
-                writeResponseToClient(queuedResponse, session);
+                
+            }
+            // Check if a ResponseListener was added before. If not we can allow to write
+            // responses again. Otherwise the writing will get triggered from the listener
+            if (listenerAdded == false) {
+                write.set(false);
             }
-            
         }
+
         
     }
     



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org