You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2011/12/21 09:36:01 UTC
svn commit: r1221644 -
/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Author: norman
Date: Wed Dec 21 08:36:01 2011
New Revision: 1221644
URL: http://svn.apache.org/viewvc?rev=1221644&view=rev
Log:
Make sure the Responses are written in the correct order even if FutureResponse's and Response's get mixed. See PROTOCOLS-62
Modified:
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221644&r1=1221643&r2=1221644&view=diff
==============================================================================
--- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
+++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 08:36:01 2011
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.james.protocols.api.FutureResponse.ResponseListener;
@@ -42,7 +43,8 @@ public abstract class AbstractProtocolTr
// TODO: Should we limit the size ?
private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>();
-
+ private final AtomicBoolean write = new AtomicBoolean(false);
+
/**
* @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
*/
@@ -64,37 +66,48 @@ public abstract class AbstractProtocolTr
*/
private void writeQueuedResponses(final ProtocolSession session) {
Response queuedResponse = null;
-
- // dequeue Responses until non is left
- while ((queuedResponse = responses.poll()) != null) {
-
- // check if we need to take special care of FutureResponses
- if (queuedResponse instanceof FutureResponse) {
- FutureResponse futureResponse =(FutureResponse) queuedResponse;
- if (futureResponse.isReady()) {
- // future is ready so we can write it without blocking the IO-Thread
- writeResponseToClient(queuedResponse, session);
- } else {
-
- // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
- futureResponse.addListener(new ResponseListener() {
+
+ if (write.compareAndSet(false, true)){
+ boolean listenerAdded = false;
+ // dequeue Responses until non is left
+ while ((queuedResponse = responses.poll()) != null) {
+
+ // check if we need to take special care of FutureResponses
+ if (queuedResponse instanceof FutureResponse) {
+ FutureResponse futureResponse =(FutureResponse) queuedResponse;
+ if (futureResponse.isReady()) {
+ // future is ready so we can write it without blocking the IO-Thread
+ writeResponseToClient(queuedResponse, session);
+ } else {
- public void onResponse(FutureResponse response) {
- writeResponseToClient(response, session);
- writeQueuedResponses(session);
- }
- });
+ // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
+ futureResponse.addListener(new ResponseListener() {
+
+ public void onResponse(FutureResponse response) {
+ writeResponseToClient(response, session);
+ if (write.compareAndSet(true, false)) {
+ writeQueuedResponses(session);
+ }
+ }
+ });
+ listenerAdded = true;
+ // just break here as we will trigger the dequeue later
+ break;
+ }
- // just break here as we will trigger the dequeue later
- break;
+ } else {
+ // the Response is not a FutureResponse, so just write it back the the remote peer
+ writeResponseToClient(queuedResponse, session);
}
-
- } else {
- // the Response is not a FutureResponse, so just write it back the the remote peer
- writeResponseToClient(queuedResponse, session);
+
+ }
+ // Check if a ResponseListener was added before. If not we can allow to write
+ // responses again. Otherwise the writing will get triggered from the listener
+ if (listenerAdded == false) {
+ write.set(false);
}
-
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org