You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ba...@apache.org on 2011/12/21 15:47:25 UTC

svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Author: bago
Date: Wed Dec 21 14:47:25 2011
New Revision: 1221748

URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
Log:
An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).

Modified:
    james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
==============================================================================
--- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
+++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011
@@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.james.protocols.api.FutureResponse.ResponseListener;
 
@@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
 
     
     // TODO: Should we limit the size ?
-    private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>();
-    private final AtomicBoolean write = new AtomicBoolean(false);
+    private final Queue<Response> responses = new LinkedBlockingQueue<Response>();
+    private volatile boolean isAsync = false;
     
     /**
      * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
      */
     public final void writeResponse(Response response, final ProtocolSession session) {
-        // just add the response to the queue. We will trigger the write operation later
-        responses.add(response);
-             
-        // trigger the write
-        writeQueuedResponses(session);
+        // if we already in asynchrnous mode we simply enqueue the response
+        // we do this synchronously because we may have a dequeuer thread working on
+        // isAsync and responses.
+        boolean enqueued = false;
+        synchronized(this) {
+            if (isAsync == true) {
+                responses.offer(response);
+                enqueued = true;
+            }
+        }
+        
+        // if we didn't enqueue then we check if the response is writable or we have to 
+        // set us "asynchrnous" and wait for response to be ready.
+        if (!enqueued) {
+            if (isResponseWritable(response)) {
+                writeResponseToClient(response, session);
+            } else {
+                addDequeuerListener(response, session);
+                isAsync = true;
+            }
+        }
     }
     
     /**
@@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
      * @param session
      */
     private  void writeQueuedResponses(final ProtocolSession session) {
-        Response queuedResponse = null;
         
-        if (write.compareAndSet(false, true)){
-            boolean listenerAdded = false;
-            // dequeue Responses until non is left
-            while ((queuedResponse = responses.poll()) != null) {
-                    
-                // check if we need to take special care of FutureResponses
-                if (queuedResponse instanceof FutureResponse) {
-                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
-                    if (futureResponse.isReady()) {
-                        // future is ready so we can write it without blocking the IO-Thread
-                        writeResponseToClient(queuedResponse, session);
-                    } else {
-                            
-                        // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
-                        futureResponse.addListener(new ResponseListener() {
-                                
-                            public void onResponse(FutureResponse response) {
-                                writeResponseToClient(response, session);
-                                if (write.compareAndSet(true, false)) {
-                                    writeQueuedResponses(session);
-                                }
-                            }
-                        });
-                        listenerAdded = true;
-                        // just break here as we will trigger the dequeue later
-                        break;
-                    }
-                        
-                } else {
-                    // the Response is not a FutureResponse, so just write it back the the remote peer
-                    writeResponseToClient(queuedResponse, session);
+        // dequeue Responses until non is left
+        while (true) {
+            
+            Response queuedResponse = null;
+            
+            // synchrnously we check responses and if it is empty we move back to non asynch
+            // behaviour
+            synchronized(this) {
+                queuedResponse = responses.poll();
+                if (queuedResponse == null) {
+                    isAsync = false;
+                    break;
                 }
-                
             }
-            // Check if a ResponseListener was added before. If not we can allow to write
-            // responses again. Otherwise the writing will get triggered from the listener
-            if (listenerAdded == false) {
-                write.set(false);
+
+            // if we have something in the queue we continue writing until we
+            // find something asynchronous.
+            if (isResponseWritable(queuedResponse)) {
+                writeResponseToClient(queuedResponse, session);
+            } else {
+                addDequeuerListener(queuedResponse, session);
+                // no changes to isAsync here, because in this method we are always already async.
+                break;
             }
         }
-
-        
+    }
+    
+    private boolean isResponseWritable(Response response) {
+        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
+    }
+    
+    private void addDequeuerListener(Response response, final ProtocolSession session) {
+        ((FutureResponse) response).addListener(new ResponseListener() {
+                
+            public void onResponse(FutureResponse response) {
+                writeResponseToClient(response, session);
+                writeQueuedResponses(session);
+            }
+        });
     }
     
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 -/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Norman Maurer <no...@apache.org>.
This makes sense! Thanks for taking care. I will cut a new release now that this is fixed. I think I will call it 1.6.0-RC1 as we are really close ;)

Bye,
Norman


-- 
Norman Maurer


Am Mittwoch, 21. Dezember 2011 um 15:47 schrieb bago@apache.org:

> Author: bago
> Date: Wed Dec 21 14:47:25 2011
> New Revision: 1221748
> 
> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
> Log:
> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).
> 
> Modified:
> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> 
> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
> ==============================================================================
> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011
> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
> import java.io.InputStream;
> import java.io.UnsupportedEncodingException;
> import java.util.List;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.atomic.AtomicBoolean;
> -
> +import java.util.Queue;
> +import java.util.concurrent.LinkedBlockingQueue;
> 
> import org.apache.james.protocols.api.FutureResponse.ResponseListener;
> 
> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
> 
> 
> // TODO: Should we limit the size ?
> - private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>();
> - private final AtomicBoolean write = new AtomicBoolean(false);
> + private final Queue<Response> responses = new LinkedBlockingQueue<Response>();
> + private volatile boolean isAsync = false;
> 
> /**
> * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
> */
> public final void writeResponse(Response response, final ProtocolSession session) {
> - // just add the response to the queue. We will trigger the write operation later
> - responses.add(response);
> - 
> - // trigger the write
> - writeQueuedResponses(session);
> + // if we already in asynchrnous mode we simply enqueue the response
> + // we do this synchronously because we may have a dequeuer thread working on
> + // isAsync and responses.
> + boolean enqueued = false;
> + synchronized(this) {
> + if (isAsync == true) {
> + responses.offer(response);
> + enqueued = true;
> + }
> + }
> + 
> + // if we didn't enqueue then we check if the response is writable or we have to 
> + // set us "asynchrnous" and wait for response to be ready.
> + if (!enqueued) {
> + if (isResponseWritable(response)) {
> + writeResponseToClient(response, session);
> + } else {
> + addDequeuerListener(response, session);
> + isAsync = true;
> + }
> + }
> }
> 
> /**
> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
> * @param session
> */
> private void writeQueuedResponses(final ProtocolSession session) {
> - Response queuedResponse = null;
> 
> - if (write.compareAndSet(false, true)){
> - boolean listenerAdded = false;
> - // dequeue Responses until non is left
> - while ((queuedResponse = responses.poll()) != null) {
> - 
> - // check if we need to take special care of FutureResponses
> - if (queuedResponse instanceof FutureResponse) {
> - FutureResponse futureResponse =(FutureResponse) queuedResponse;
> - if (futureResponse.isReady()) {
> - // future is ready so we can write it without blocking the IO-Thread
> - writeResponseToClient(queuedResponse, session);
> - } else {
> - 
> - // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
> - futureResponse.addListener(new ResponseListener() {
> - 
> - public void onResponse(FutureResponse response) {
> - writeResponseToClient(response, session);
> - if (write.compareAndSet(true, false)) {
> - writeQueuedResponses(session);
> - }
> - }
> - });
> - listenerAdded = true;
> - // just break here as we will trigger the dequeue later
> - break;
> - }
> - 
> - } else {
> - // the Response is not a FutureResponse, so just write it back the the remote peer
> - writeResponseToClient(queuedResponse, session);
> + // dequeue Responses until non is left
> + while (true) {
> + 
> + Response queuedResponse = null;
> + 
> + // synchrnously we check responses and if it is empty we move back to non asynch
> + // behaviour
> + synchronized(this) {
> + queuedResponse = responses.poll();
> + if (queuedResponse == null) {
> + isAsync = false;
> + break;
> }
> - 
> }
> - // Check if a ResponseListener was added before. If not we can allow to write
> - // responses again. Otherwise the writing will get triggered from the listener
> - if (listenerAdded == false) {
> - write.set(false);
> +
> + // if we have something in the queue we continue writing until we
> + // find something asynchronous.
> + if (isResponseWritable(queuedResponse)) {
> + writeResponseToClient(queuedResponse, session);
> + } else {
> + addDequeuerListener(queuedResponse, session);
> + // no changes to isAsync here, because in this method we are always already async.
> + break;
> }
> }
> -
> - 
> + }
> + 
> + private boolean isResponseWritable(Response response) {
> + return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
> + }
> + 
> + private void addDequeuerListener(Response response, final ProtocolSession session) {
> + ((FutureResponse) response).addListener(new ResponseListener() {
> + 
> + public void onResponse(FutureResponse response) {
> + writeResponseToClient(response, session);
> + writeQueuedResponses(session);
> + }
> + });
> }
> 
> /**
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org (mailto:server-dev-unsubscribe@james.apache.org)
> For additional commands, e-mail: server-dev-help@james.apache.org (mailto:server-dev-help@james.apache.org)
> 
> 



Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Eric Charles <er...@apache.org>.
Sorry Stefano :)

Thx Norman, I finally found the AbstractProtocolTransportTest (test 
source folder was not in my build path.

Eric


On 24/12/11 16:28, Norman Maurer wrote:
> Hi Eric,
>
> Its Stefano not Stephano ;)
>
> Comments inside...
>
> Am Samstag, 24. Dezember 2011 schrieb Eric Charles<er...@apache.org>:
>> Hi Stephano, Thx for the inputs. I'm fine to have volatile fields and to
> use them is synchronized blocks when needed, and out-of synchronized blocks
> when possible.
>>
>> I still have to find the few hours to better understand the usage and
> context.
>>
>> Maybe you can give me a hint about about the 2 user threads?
>>
>> Is there a test case that ensures the class is thread-safe (or sould I
> ask 'is it feasible to have a test for this')?
>>
>
> Check Out the AbstractProtocolTransportTest..
>
>> How did you came to the conclusion it was not thread-safe: pure code
> review, or exceptions/abnormal behavior in an operational deployment?
>
> I noticed that the Responses were written out of order at a paid Project .
> That was why I was starting to investigate..
>>
>> I believe the beauty resides in the easy-understanding. For example, if
> this class is designed to be used by only 2 threads, it should come out for
> the reader from javadoc, method, fields... namings.
>
> As this is more a internal thing I would Not mention it.
>>
>> Thx,
>>
>> Eric
>
> Bye,
> Norman
>
>>
>>
>> On 24/12/11 12:19, Stefano Bagnara wrote:
>>
>> 2011/12/24 Eric Charles<er...@apache.org>:
>>
>> Hi Stephano,
>>
>> Opening the discussion to learn more :)
>>
>> - Why are you considering that 2 threads is a criteria to use standard
>> synchronization rather than some atomic fields.
>>
>> It is a criteria to not use the ConcurrentLinkedQueue that is a
>> structure thought to handle many concurrent threads and is overkill
>> for 2 threads.
>>
>> - I can understand you replace a concurrent by a non-concurrent queue.
>> However, you now have a blocking queue. Is there an impact due to this
>> blocking aspect?
>>
>> I think the answer is no. That's why I did that.
>> Remember we have 2 threads and they do 2 different things, they simply
>> block each other when they add or remove from the queue.
>>
>> - You defined isAsync as volatile and sometimes encapsulate access to
>> isAsync in a synchronized block, sometime not. Why using 2 different
>> thread-safety strategies in this class?
>>
>> Because some times it needed sincronization, other times I felt it was
>> not needed (the access to the volatile doesn't need synchronization. I
>> just synchronize to ensure that the change to the list happens
>> together with the change in the volatile var).
>> If you can find a better solution you're welcome to provide one. It
>> took a couple of hours to reach a working solution.
>>
>> The previous one was not thread safe at this line:
>> ------
>> if (listenerAdded == false) {
>>    write.set(false);
>> -----
>> It could happen another thread already added a new item to the queue
>> but skipped to process it because write was true. So we ended up with
>> an item in the queue never written.
>>
>> I don't like too much my solution and I felt it a bit hackish, but
>> that was my best solution for my limited time, so if you can provide a
>> more elegant solution while still being thread safe, I'm more than
>> happy :-)
>>
>> Stefano
>>
>>
>> Thx,
>>
>> Eric
>>
>>
>>
>> On 21/12/11 15:47, bago@apache.org wrote:
>>
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>>
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I
>> moved back to standard synchronization as we only have max 2 threads
>> competing for the queue so it doesn't make sense to use a non blocking
>> queue. Norman, please overview, and feel free to revert if you don't like
>> the solution (i thought it was better to simply commit instead of opening
> a
>> JIRA to show you this).
>>
>> Modified:
>>
>>
> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>
>> Modified:
>>
> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL:
>>
> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>
>>
> ==============================================================================
>> ---
>>
> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> (original)
>> +++
>>
> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>   import java.io.InputStream;
>>   import java.io.UnsupportedEncodingException;
>>   import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBool
>

-- 
eric | http://about.echarles.net | @echarles

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Norman Maurer <no...@googlemail.com>.
Hi Eric,

Its Stefano not Stephano ;)

Comments inside...

Am Samstag, 24. Dezember 2011 schrieb Eric Charles <er...@apache.org>:
> Hi Stephano, Thx for the inputs. I'm fine to have volatile fields and to
use them is synchronized blocks when needed, and out-of synchronized blocks
when possible.
>
> I still have to find the few hours to better understand the usage and
context.
>
> Maybe you can give me a hint about about the 2 user threads?
>
> Is there a test case that ensures the class is thread-safe (or sould I
ask 'is it feasible to have a test for this')?
>

Check Out the AbstractProtocolTransportTest..

> How did you came to the conclusion it was not thread-safe: pure code
review, or exceptions/abnormal behavior in an operational deployment?

I noticed that the Responses were written out of order at a paid Project .
That was why I was starting to investigate..
>
> I believe the beauty resides in the easy-understanding. For example, if
this class is designed to be used by only 2 threads, it should come out for
the reader from javadoc, method, fields... namings.

As this is more a internal thing I would Not mention it.
>
> Thx,
>
> Eric

Bye,
Norman

>
>
> On 24/12/11 12:19, Stefano Bagnara wrote:
>
> 2011/12/24 Eric Charles<er...@apache.org>:
>
> Hi Stephano,
>
> Opening the discussion to learn more :)
>
> - Why are you considering that 2 threads is a criteria to use standard
> synchronization rather than some atomic fields.
>
> It is a criteria to not use the ConcurrentLinkedQueue that is a
> structure thought to handle many concurrent threads and is overkill
> for 2 threads.
>
> - I can understand you replace a concurrent by a non-concurrent queue.
> However, you now have a blocking queue. Is there an impact due to this
> blocking aspect?
>
> I think the answer is no. That's why I did that.
> Remember we have 2 threads and they do 2 different things, they simply
> block each other when they add or remove from the queue.
>
> - You defined isAsync as volatile and sometimes encapsulate access to
> isAsync in a synchronized block, sometime not. Why using 2 different
> thread-safety strategies in this class?
>
> Because some times it needed sincronization, other times I felt it was
> not needed (the access to the volatile doesn't need synchronization. I
> just synchronize to ensure that the change to the list happens
> together with the change in the volatile var).
> If you can find a better solution you're welcome to provide one. It
> took a couple of hours to reach a working solution.
>
> The previous one was not thread safe at this line:
> ------
> if (listenerAdded == false) {
>   write.set(false);
> -----
> It could happen another thread already added a new item to the queue
> but skipped to process it because write was true. So we ended up with
> an item in the queue never written.
>
> I don't like too much my solution and I felt it a bit hackish, but
> that was my best solution for my limited time, so if you can provide a
> more elegant solution while still being thread safe, I'm more than
> happy :-)
>
> Stefano
>
>
> Thx,
>
> Eric
>
>
>
> On 21/12/11 15:47, bago@apache.org wrote:
>
> Author: bago
> Date: Wed Dec 21 14:47:25 2011
> New Revision: 1221748
>
> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
> Log:
> An attempt to refactor AbstractProtocolTransport to be thread safe. I
> moved back to standard synchronization as we only have max 2 threads
> competing for the queue so it doesn't make sense to use a non blocking
> queue. Norman, please overview, and feel free to revert if you don't like
> the solution (i thought it was better to simply commit instead of opening
a
> JIRA to show you this).
>
> Modified:
>
>
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>
> Modified:
>
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> URL:
>
http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>
>
==============================================================================
> ---
>
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> (original)
> +++
>
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> Wed Dec 21 14:47:25 2011
> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>  import java.io.InputStream;
>  import java.io.UnsupportedEncodingException;
>  import java.util.List;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.atomic.AtomicBool

Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Stefano Bagnara <ap...@bago.org>.
2011/12/24 Eric Charles <er...@apache.org>:
> Maybe you can give me a hint about about the 2 user threads?

The main thread is the one that write new "reponses". It is the
protocol thread and calls the writeResponse method each time a new
reponse is available.
If the response is a FutureResponse then the whole thing swithes to a
buffered more (response queue) and a listener is added to listen for
future completion.
The thread that will receive the future completion will then work to
dequeue the buffered responses.
If it finds more FutureResponses it will start the listening again,
otherwise if the queue is clear then switches back to direct writing.

So, you have 1 thread writing responses and maybe 1 thread working to
dequeue things from the buffer. Of course the real concurrency depends
on the protocol implementation, how much they use FutureResponses
instead of simple Response and how long they wait for such responses.
They have very low concurrency. In most cases you won't have new
writes untile the future response answer, so I expect most cases we
won't have concurrency at all.

> Is there a test case that ensures the class is thread-safe (or sould I ask
> 'is it feasible to have a test for this')?

IMO it is not feasible.
Maybe you can write tests to reproduce some issue on specific
hardware, but this won't be in any way a full test suite.

> How did you came to the conclusion it was not thread-safe: pure code review,
> or exceptions/abnormal behavior in an operational deployment?

For the code from 2 versions ago we even had a test case (norman wrote
it), but mainly because it was a major issue.
For the previous version, instead, it was from pure code review: the
thread that is dequeing responses loops until the queue is empty (A),
then if it is empty and there are no more futures involved then it
re-enabled direct writing (B). If another threads tried to write a new
response while the first thread was between (A) and (B) then you end
up with an element in the queue and no one taking care to write it
(and this breaks almost any protocol I know).
The fact that (A) and (B) are near and the fact that we only rarely
have 2 threads working on that means this would probably happen very
rarely in production, but still it exists.

> I believe the beauty resides in the easy-understanding. For example, if this
> class is designed to be used by only 2 threads, it should come out for the
> reader from javadoc, method, fields... namings.
> Thx,

I usually avoid using volatile/synchronization and prefer using some
class from the wonderful java concurrent package, but I didn't find a
better solution so I moved to "something working" (or at least I hope
it works, that's why I asked for review).

Stefano

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Eric Charles <er...@apache.org>.
Hi Stephano, Thx for the inputs. I'm fine to have volatile fields and to 
use them is synchronized blocks when needed, and out-of synchronized 
blocks when possible.

I still have to find the few hours to better understand the usage and 
context.

Maybe you can give me a hint about about the 2 user threads?

Is there a test case that ensures the class is thread-safe (or sould I 
ask 'is it feasible to have a test for this')?

How did you came to the conclusion it was not thread-safe: pure code 
review, or exceptions/abnormal behavior in an operational deployment?

I believe the beauty resides in the easy-understanding. For example, if 
this class is designed to be used by only 2 threads, it should come out 
for the reader from javadoc, method, fields... namings.

Thx,

Eric


On 24/12/11 12:19, Stefano Bagnara wrote:
> 2011/12/24 Eric Charles<er...@apache.org>:
>> Hi Stephano,
>>
>> Opening the discussion to learn more :)
>>
>> - Why are you considering that 2 threads is a criteria to use standard
>> synchronization rather than some atomic fields.
>
> It is a criteria to not use the ConcurrentLinkedQueue that is a
> structure thought to handle many concurrent threads and is overkill
> for 2 threads.
>
>> - I can understand you replace a concurrent by a non-concurrent queue.
>> However, you now have a blocking queue. Is there an impact due to this
>> blocking aspect?
>
> I think the answer is no. That's why I did that.
> Remember we have 2 threads and they do 2 different things, they simply
> block each other when they add or remove from the queue.
>
>> - You defined isAsync as volatile and sometimes encapsulate access to
>> isAsync in a synchronized block, sometime not. Why using 2 different
>> thread-safety strategies in this class?
>
> Because some times it needed sincronization, other times I felt it was
> not needed (the access to the volatile doesn't need synchronization. I
> just synchronize to ensure that the change to the list happens
> together with the change in the volatile var).
> If you can find a better solution you're welcome to provide one. It
> took a couple of hours to reach a working solution.
>
> The previous one was not thread safe at this line:
> ------
> if (listenerAdded == false) {
>    write.set(false);
> -----
> It could happen another thread already added a new item to the queue
> but skipped to process it because write was true. So we ended up with
> an item in the queue never written.
>
> I don't like too much my solution and I felt it a bit hackish, but
> that was my best solution for my limited time, so if you can provide a
> more elegant solution while still being thread safe, I'm more than
> happy :-)
>
> Stefano
>
>>
>> Thx,
>>
>> Eric
>>
>>
>>
>> On 21/12/11 15:47, bago@apache.org wrote:
>>>
>>> Author: bago
>>> Date: Wed Dec 21 14:47:25 2011
>>> New Revision: 1221748
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>>> Log:
>>> An attempt to refactor AbstractProtocolTransport to be thread safe. I
>>> moved back to standard synchronization as we only have max 2 threads
>>> competing for the queue so it doesn't make sense to use a non blocking
>>> queue. Norman, please overview, and feel free to revert if you don't like
>>> the solution (i thought it was better to simply commit instead of opening a
>>> JIRA to show you this).
>>>
>>> Modified:
>>>
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>>
>>> Modified:
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> URL:
>>> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> (original)
>>> +++
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> Wed Dec 21 14:47:25 2011
>>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>>   import java.io.InputStream;
>>>   import java.io.UnsupportedEncodingException;
>>>   import java.util.List;
>>> -import java.util.concurrent.ConcurrentLinkedQueue;
>>> -import java.util.concurrent.atomic.AtomicBoolean;
>>> -
>>> +import java.util.Queue;
>>> +import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>   import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>>
>>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>>
>>>
>>>       // TODO: Should we limit the size ?
>>> -    private final ConcurrentLinkedQueue<Response>    responses = new
>>> ConcurrentLinkedQueue<Response>();
>>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>>> +    private final Queue<Response>    responses = new
>>> LinkedBlockingQueue<Response>();
>>> +    private volatile boolean isAsync = false;
>>>
>>>       /**
>>>        * @see
>>> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
>>> org.apache.james.protocols.api.ProtocolSession)
>>>        */
>>>       public final void writeResponse(Response response, final
>>> ProtocolSession session) {
>>> -        // just add the response to the queue. We will trigger the write
>>> operation later
>>> -        responses.add(response);
>>> -
>>> -        // trigger the write
>>> -        writeQueuedResponses(session);
>>> +        // if we already in asynchrnous mode we simply enqueue the
>>> response
>>> +        // we do this synchronously because we may have a dequeuer thread
>>> working on
>>> +        // isAsync and responses.
>>> +        boolean enqueued = false;
>>> +        synchronized(this) {
>>> +            if (isAsync == true) {
>>> +                responses.offer(response);
>>> +                enqueued = true;
>>> +            }
>>> +        }
>>> +
>>> +        // if we didn't enqueue then we check if the response is writable
>>> or we have to
>>> +        // set us "asynchrnous" and wait for response to be ready.
>>> +        if (!enqueued) {
>>> +            if (isResponseWritable(response)) {
>>> +                writeResponseToClient(response, session);
>>> +            } else {
>>> +                addDequeuerListener(response, session);
>>> +                isAsync = true;
>>> +            }
>>> +        }
>>>       }
>>>
>>>       /**
>>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>>        * @param session
>>>        */
>>>       private  void writeQueuedResponses(final ProtocolSession session) {
>>> -        Response queuedResponse = null;
>>>
>>> -        if (write.compareAndSet(false, true)){
>>> -            boolean listenerAdded = false;
>>> -            // dequeue Responses until non is left
>>> -            while ((queuedResponse = responses.poll()) != null) {
>>> -
>>> -                // check if we need to take special care of
>>> FutureResponses
>>> -                if (queuedResponse instanceof FutureResponse) {
>>> -                    FutureResponse futureResponse =(FutureResponse)
>>> queuedResponse;
>>> -                    if (futureResponse.isReady()) {
>>> -                        // future is ready so we can write it without
>>> blocking the IO-Thread
>>> -                        writeResponseToClient(queuedResponse, session);
>>> -                    } else {
>>> -
>>> -                        // future is not ready so we need to write it via
>>> a ResponseListener otherwise we MAY block the IO-Thread
>>> -                        futureResponse.addListener(new ResponseListener()
>>> {
>>> -
>>> -                            public void onResponse(FutureResponse
>>> response) {
>>> -                                writeResponseToClient(response, session);
>>> -                                if (write.compareAndSet(true, false)) {
>>> -                                    writeQueuedResponses(session);
>>> -                                }
>>> -                            }
>>> -                        });
>>> -                        listenerAdded = true;
>>> -                        // just break here as we will trigger the dequeue
>>> later
>>> -                        break;
>>> -                    }
>>> -
>>> -                } else {
>>> -                    // the Response is not a FutureResponse, so just
>>> write it back the the remote peer
>>> -                    writeResponseToClient(queuedResponse, session);
>>> +        // dequeue Responses until non is left
>>> +        while (true) {
>>> +
>>> +            Response queuedResponse = null;
>>> +
>>> +            // synchrnously we check responses and if it is empty we move
>>> back to non asynch
>>> +            // behaviour
>>> +            synchronized(this) {
>>> +                queuedResponse = responses.poll();
>>> +                if (queuedResponse == null) {
>>> +                    isAsync = false;
>>> +                    break;
>>>                   }
>>> -
>>>               }
>>> -            // Check if a ResponseListener was added before. If not we
>>> can allow to write
>>> -            // responses again. Otherwise the writing will get triggered
>>> from the listener
>>> -            if (listenerAdded == false) {
>>> -                write.set(false);
>>> +
>>> +            // if we have something in the queue we continue writing
>>> until we
>>> +            // find something asynchronous.
>>> +            if (isResponseWritable(queuedResponse)) {
>>> +                writeResponseToClient(queuedResponse, session);
>>> +            } else {
>>> +                addDequeuerListener(queuedResponse, session);
>>> +                // no changes to isAsync here, because in this method we
>>> are always already async.
>>> +                break;
>>>               }
>>>           }
>>> -
>>> -
>>> +    }
>>> +
>>> +    private boolean isResponseWritable(Response response) {
>>> +        return !(response instanceof FutureResponse) || ((FutureResponse)
>>> response).isReady();
>>> +    }
>>> +
>>> +    private void addDequeuerListener(Response response, final
>>> ProtocolSession session) {
>>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>>> +
>>> +            public void onResponse(FutureResponse response) {
>>> +                writeResponseToClient(response, session);
>>> +                writeQueuedResponses(session);
>>> +            }
>>> +        });
>>>       }
>>>
>>>       /**
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>>> For additional commands, e-mail: server-dev-help@james.apache.org
>>>
>>
>> --
>> Eric http://about.echarles.net
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Stefano Bagnara <ap...@bago.org>.
2011/12/24 Eric Charles <er...@apache.org>:
> Hi Stephano,
>
> Opening the discussion to learn more :)
>
> - Why are you considering that 2 threads is a criteria to use standard
> synchronization rather than some atomic fields.

It is a criteria to not use the ConcurrentLinkedQueue that is a
structure thought to handle many concurrent threads and is overkill
for 2 threads.

> - I can understand you replace a concurrent by a non-concurrent queue.
> However, you now have a blocking queue. Is there an impact due to this
> blocking aspect?

I think the answer is no. That's why I did that.
Remember we have 2 threads and they do 2 different things, they simply
block each other when they add or remove from the queue.

> - You defined isAsync as volatile and sometimes encapsulate access to
> isAsync in a synchronized block, sometime not. Why using 2 different
> thread-safety strategies in this class?

Because some times it needed sincronization, other times I felt it was
not needed (the access to the volatile doesn't need synchronization. I
just synchronize to ensure that the change to the list happens
together with the change in the volatile var).
If you can find a better solution you're welcome to provide one. It
took a couple of hours to reach a working solution.

The previous one was not thread safe at this line:
------
if (listenerAdded == false) {
  write.set(false);
-----
It could happen another thread already added a new item to the queue
but skipped to process it because write was true. So we ended up with
an item in the queue never written.

I don't like too much my solution and I felt it a bit hackish, but
that was my best solution for my limited time, so if you can provide a
more elegant solution while still being thread safe, I'm more than
happy :-)

Stefano

>
> Thx,
>
> Eric
>
>
>
> On 21/12/11 15:47, bago@apache.org wrote:
>>
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>>
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I
>> moved back to standard synchronization as we only have max 2 threads
>> competing for the queue so it doesn't make sense to use a non blocking
>> queue. Norman, please overview, and feel free to revert if you don't like
>> the solution (i thought it was better to simply commit instead of opening a
>> JIRA to show you this).
>>
>> Modified:
>>
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>
>> Modified:
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL:
>> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>
>> ==============================================================================
>> ---
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> (original)
>> +++
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>  import java.io.InputStream;
>>  import java.io.UnsupportedEncodingException;
>>  import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>> -
>> +import java.util.Queue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>>
>>  import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>
>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>
>>
>>      // TODO: Should we limit the size ?
>> -    private final ConcurrentLinkedQueue<Response>  responses = new
>> ConcurrentLinkedQueue<Response>();
>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>> +    private final Queue<Response>  responses = new
>> LinkedBlockingQueue<Response>();
>> +    private volatile boolean isAsync = false;
>>
>>      /**
>>       * @see
>> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
>> org.apache.james.protocols.api.ProtocolSession)
>>       */
>>      public final void writeResponse(Response response, final
>> ProtocolSession session) {
>> -        // just add the response to the queue. We will trigger the write
>> operation later
>> -        responses.add(response);
>> -
>> -        // trigger the write
>> -        writeQueuedResponses(session);
>> +        // if we already in asynchrnous mode we simply enqueue the
>> response
>> +        // we do this synchronously because we may have a dequeuer thread
>> working on
>> +        // isAsync and responses.
>> +        boolean enqueued = false;
>> +        synchronized(this) {
>> +            if (isAsync == true) {
>> +                responses.offer(response);
>> +                enqueued = true;
>> +            }
>> +        }
>> +
>> +        // if we didn't enqueue then we check if the response is writable
>> or we have to
>> +        // set us "asynchrnous" and wait for response to be ready.
>> +        if (!enqueued) {
>> +            if (isResponseWritable(response)) {
>> +                writeResponseToClient(response, session);
>> +            } else {
>> +                addDequeuerListener(response, session);
>> +                isAsync = true;
>> +            }
>> +        }
>>      }
>>
>>      /**
>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>       * @param session
>>       */
>>      private  void writeQueuedResponses(final ProtocolSession session) {
>> -        Response queuedResponse = null;
>>
>> -        if (write.compareAndSet(false, true)){
>> -            boolean listenerAdded = false;
>> -            // dequeue Responses until non is left
>> -            while ((queuedResponse = responses.poll()) != null) {
>> -
>> -                // check if we need to take special care of
>> FutureResponses
>> -                if (queuedResponse instanceof FutureResponse) {
>> -                    FutureResponse futureResponse =(FutureResponse)
>> queuedResponse;
>> -                    if (futureResponse.isReady()) {
>> -                        // future is ready so we can write it without
>> blocking the IO-Thread
>> -                        writeResponseToClient(queuedResponse, session);
>> -                    } else {
>> -
>> -                        // future is not ready so we need to write it via
>> a ResponseListener otherwise we MAY block the IO-Thread
>> -                        futureResponse.addListener(new ResponseListener()
>> {
>> -
>> -                            public void onResponse(FutureResponse
>> response) {
>> -                                writeResponseToClient(response, session);
>> -                                if (write.compareAndSet(true, false)) {
>> -                                    writeQueuedResponses(session);
>> -                                }
>> -                            }
>> -                        });
>> -                        listenerAdded = true;
>> -                        // just break here as we will trigger the dequeue
>> later
>> -                        break;
>> -                    }
>> -
>> -                } else {
>> -                    // the Response is not a FutureResponse, so just
>> write it back the the remote peer
>> -                    writeResponseToClient(queuedResponse, session);
>> +        // dequeue Responses until non is left
>> +        while (true) {
>> +
>> +            Response queuedResponse = null;
>> +
>> +            // synchrnously we check responses and if it is empty we move
>> back to non asynch
>> +            // behaviour
>> +            synchronized(this) {
>> +                queuedResponse = responses.poll();
>> +                if (queuedResponse == null) {
>> +                    isAsync = false;
>> +                    break;
>>                  }
>> -
>>              }
>> -            // Check if a ResponseListener was added before. If not we
>> can allow to write
>> -            // responses again. Otherwise the writing will get triggered
>> from the listener
>> -            if (listenerAdded == false) {
>> -                write.set(false);
>> +
>> +            // if we have something in the queue we continue writing
>> until we
>> +            // find something asynchronous.
>> +            if (isResponseWritable(queuedResponse)) {
>> +                writeResponseToClient(queuedResponse, session);
>> +            } else {
>> +                addDequeuerListener(queuedResponse, session);
>> +                // no changes to isAsync here, because in this method we
>> are always already async.
>> +                break;
>>              }
>>          }
>> -
>> -
>> +    }
>> +
>> +    private boolean isResponseWritable(Response response) {
>> +        return !(response instanceof FutureResponse) || ((FutureResponse)
>> response).isReady();
>> +    }
>> +
>> +    private void addDequeuerListener(Response response, final
>> ProtocolSession session) {
>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>> +
>> +            public void onResponse(FutureResponse response) {
>> +                writeResponseToClient(response, session);
>> +                writeQueuedResponses(session);
>> +            }
>> +        });
>>      }
>>
>>      /**
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>>
>
> --
> Eric http://about.echarles.net
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Eric Charles <er...@apache.org>.
On 24/12/11 16:31, Norman Maurer wrote:
>>>> - I can understand you replace a concurrent by a non-concurrent queue.
> However, you now have a blocking queue. Is there an impact due to this
> blocking aspect?
>>>
>>> Nope there is not as we not use the blocking methods. We could even
> replace it with a LinkedList.
>>>
>>
>> So why not replace with a LinkedList to make things crystal clear :)
>
> Wie should maybe bound the queue, for this we need the current impl.

Do you mean we should limit the number of entries?
How to define that capacity?

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Norman Maurer <no...@googlemail.com>.
Hi there,

And again See inside,,

Am Samstag, 24. Dezember 2011 schrieb Eric Charles <er...@apache.org>:
> Hi Norman,
> Thx for inputs. comment/confirmation inside.
> Eric
>
> On 24/12/11 11:53, Norman Maurer wrote:
>>
>> Hi Eric,
>>
>> comments inside....
>>
>> Am 24.12.2011 um 10:05 schrieb Eric Charles<er...@apache.org>:
>>
>>> Hi Stephano,
>>>
>>> Opening the discussion to learn more :)
>>>
>>> - Why are you considering that 2 threads is a criteria to use standard
synchronization rather than some atomic fields.
>>>
>> If you only have a small count of concurrent threads its not slower to
use synchronization as the context switching will not happen often..
>>
>>
>
> Not slower, but not faster.

It will be faster most of the time as you dont have the CAS overhead.

>
>
>>> - I can understand you replace a concurrent by a non-concurrent queue.
However, you now have a blocking queue. Is there an impact due to this
blocking aspect?
>>
>> Nope there is not as we not use the blocking methods. We could even
replace it with a LinkedList.
>>
>
> So why not replace with a LinkedList to make things crystal clear :)

Wie should maybe bound the queue, for this we need the current impl.
>
>>>
>>> - You defined isAsync as volatile and sometimes encapsulate access to
isAsync in a synchronized block, sometime not. Why using 2 different
thread-safety strategies in this class?
>>>
>> If you only need to access a "status flag" ina concurrent way then its
more cheap to just use a volatile for it. If you need to update more then
one field in a "atomic" way you need synchronized. Updating a volatile in a
synchronized is not a problem...
>>
>
> Sure. I will further look at the usage context (the 2 user threads) to
have a better idea.
>
> Thx again Norman,
>
> Eric
>
>
> Thx,
>
> Eric
>

Bye,
Norman

>
> Hope it helps,
> Norman
>
>
> On 21/12/11 15:47, bago@apache.org wrote:
>
> Author: bago
> Date: Wed Dec 21 14:47:25 2011
> New Revision: 1221748
>
> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
> Log:
> An attempt to refactor AbstractProtocolTransport to be thread safe. I
moved back to standard synchronization as we only have max 2 threads
competing for the queue so it doesn't make sense to use a non blocking
queue. Norman, please overview, and feel free to revert if you don't like
the solution (i thought it was better to simply commit instead of opening a
JIRA to show you this).
>
> Modified:
>
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>
> Modified:
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> URL:
http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>
==============================================================================
> ---
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
(original)
> +++
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Wed Dec 21 14:47:25 2011
> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>  import java.io.InputStream;
>  import java.io.UnsupportedEncodingException;
>  import java.util.List;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.atomic.AtomicBoolean;
> -
> +import java.util.Queue;
> +import java.util.concurrent.LinkedBlockingQueue;
>
>  import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>
> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>
>
>      // TODO: Should we limit the size ?
> -    private final ConcurrentLinkedQueue<Response>   responses = new
ConcurrentLinkedQueue<Response>();
> -    private final AtomicBoolean write = new AtomicBoolean(false);
> +    private final Queue<Response>   responses = new
LinkedBlockingQueue<Response>();
> +    private volatile boolean isAsync = false;
>
>      /**
>       * @see
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
>       */
>      public final void writeResponse(Response response, final
ProtocolSession session) {
> -        // just add the response to the queue. We will trigger the write
operation later
> -        responses.add(response);
> -
> -        // trigger the write
> -        writeQueuedResponses(session);
> +        // if we already in asynchrnous mode we simply enqueue the
response
> +        // we do this synchronously because we may have a dequeuer
thread working on
> +        // isAsync and responses.
> +        boolean enqueued = false;
> +        synchronized(this) {
> +            if (isAsync == true) {
> +                responses.offer(response);
> +                enqueued = true;
> +            }
> +        }
> +
> +        // if we didn't enqueue then we check if the response is
writable or we have to
> +        // set us "asynchrnous" and wait for response to be ready.
> +        if (!enqueued) {
> +            if (isResponseWritable(response)) {
> +                writeResponseToClient(response, session);
> +            } else {
> +                addDequeuerListener(response, session);
> +                isAsync = true;
> +            }
> +        }
>      }
>
>      /**
> @@ -65,50 +80,46 @@ public abstrac

Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Eric Charles <er...@apache.org>.
Hi Norman,
Thx for inputs. comment/confirmation inside.
Eric

On 24/12/11 11:53, Norman Maurer wrote:
> Hi Eric,
>
> comments inside....
>
> Am 24.12.2011 um 10:05 schrieb Eric Charles<er...@apache.org>:
>
>> Hi Stephano,
>>
>> Opening the discussion to learn more :)
>>
>> - Why are you considering that 2 threads is a criteria to use standard synchronization rather than some atomic fields.
>>
> If you only have a small count of concurrent threads its not slower to use synchronization as the context switching will not happen often..
>
>

Not slower, but not faster.


>> - I can understand you replace a concurrent by a non-concurrent queue. However, you now have a blocking queue. Is there an impact due to this blocking aspect?
>
> Nope there is not as we not use the blocking methods. We could even replace it with a LinkedList.
>

So why not replace with a LinkedList to make things crystal clear :)

>>
>> - You defined isAsync as volatile and sometimes encapsulate access to isAsync in a synchronized block, sometime not. Why using 2 different thread-safety strategies in this class?
>>
> If you only need to access a "status flag" ina concurrent way then its more cheap to just use a volatile for it. If you need to update more then one field in a "atomic" way you need synchronized. Updating a volatile in a synchronized is not a problem...
>

Sure. I will further look at the usage context (the 2 user threads) to 
have a better idea.

Thx again Norman,

Eric

>
>> Thx,
>>
>> Eric
>>
>>
> Hope it helps,
> Norman
>
>
>> On 21/12/11 15:47, bago@apache.org wrote:
>>> Author: bago
>>> Date: Wed Dec 21 14:47:25 2011
>>> New Revision: 1221748
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>>> Log:
>>> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).
>>>
>>> Modified:
>>>      james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>>
>>> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>> ==============================================================================
>>> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
>>> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011
>>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>>   import java.io.InputStream;
>>>   import java.io.UnsupportedEncodingException;
>>>   import java.util.List;
>>> -import java.util.concurrent.ConcurrentLinkedQueue;
>>> -import java.util.concurrent.atomic.AtomicBoolean;
>>> -
>>> +import java.util.Queue;
>>> +import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>   import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>>
>>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>>
>>>
>>>       // TODO: Should we limit the size ?
>>> -    private final ConcurrentLinkedQueue<Response>   responses = new ConcurrentLinkedQueue<Response>();
>>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>>> +    private final Queue<Response>   responses = new LinkedBlockingQueue<Response>();
>>> +    private volatile boolean isAsync = false;
>>>
>>>       /**
>>>        * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
>>>        */
>>>       public final void writeResponse(Response response, final ProtocolSession session) {
>>> -        // just add the response to the queue. We will trigger the write operation later
>>> -        responses.add(response);
>>> -
>>> -        // trigger the write
>>> -        writeQueuedResponses(session);
>>> +        // if we already in asynchrnous mode we simply enqueue the response
>>> +        // we do this synchronously because we may have a dequeuer thread working on
>>> +        // isAsync and responses.
>>> +        boolean enqueued = false;
>>> +        synchronized(this) {
>>> +            if (isAsync == true) {
>>> +                responses.offer(response);
>>> +                enqueued = true;
>>> +            }
>>> +        }
>>> +
>>> +        // if we didn't enqueue then we check if the response is writable or we have to
>>> +        // set us "asynchrnous" and wait for response to be ready.
>>> +        if (!enqueued) {
>>> +            if (isResponseWritable(response)) {
>>> +                writeResponseToClient(response, session);
>>> +            } else {
>>> +                addDequeuerListener(response, session);
>>> +                isAsync = true;
>>> +            }
>>> +        }
>>>       }
>>>
>>>       /**
>>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>>        * @param session
>>>        */
>>>       private  void writeQueuedResponses(final ProtocolSession session) {
>>> -        Response queuedResponse = null;
>>>
>>> -        if (write.compareAndSet(false, true)){
>>> -            boolean listenerAdded = false;
>>> -            // dequeue Responses until non is left
>>> -            while ((queuedResponse = responses.poll()) != null) {
>>> -
>>> -                // check if we need to take special care of FutureResponses
>>> -                if (queuedResponse instanceof FutureResponse) {
>>> -                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
>>> -                    if (futureResponse.isReady()) {
>>> -                        // future is ready so we can write it without blocking the IO-Thread
>>> -                        writeResponseToClient(queuedResponse, session);
>>> -                    } else {
>>> -
>>> -                        // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
>>> -                        futureResponse.addListener(new ResponseListener() {
>>> -
>>> -                            public void onResponse(FutureResponse response) {
>>> -                                writeResponseToClient(response, session);
>>> -                                if (write.compareAndSet(true, false)) {
>>> -                                    writeQueuedResponses(session);
>>> -                                }
>>> -                            }
>>> -                        });
>>> -                        listenerAdded = true;
>>> -                        // just break here as we will trigger the dequeue later
>>> -                        break;
>>> -                    }
>>> -
>>> -                } else {
>>> -                    // the Response is not a FutureResponse, so just write it back the the remote peer
>>> -                    writeResponseToClient(queuedResponse, session);
>>> +        // dequeue Responses until non is left
>>> +        while (true) {
>>> +
>>> +            Response queuedResponse = null;
>>> +
>>> +            // synchrnously we check responses and if it is empty we move back to non asynch
>>> +            // behaviour
>>> +            synchronized(this) {
>>> +                queuedResponse = responses.poll();
>>> +                if (queuedResponse == null) {
>>> +                    isAsync = false;
>>> +                    break;
>>>                   }
>>> -
>>>               }
>>> -            // Check if a ResponseListener was added before. If not we can allow to write
>>> -            // responses again. Otherwise the writing will get triggered from the listener
>>> -            if (listenerAdded == false) {
>>> -                write.set(false);
>>> +
>>> +            // if we have something in the queue we continue writing until we
>>> +            // find something asynchronous.
>>> +            if (isResponseWritable(queuedResponse)) {
>>> +                writeResponseToClient(queuedResponse, session);
>>> +            } else {
>>> +                addDequeuerListener(queuedResponse, session);
>>> +                // no changes to isAsync here, because in this method we are always already async.
>>> +                break;
>>>               }
>>>           }
>>> -
>>> -
>>> +    }
>>> +
>>> +    private boolean isResponseWritable(Response response) {
>>> +        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
>>> +    }
>>> +
>>> +    private void addDequeuerListener(Response response, final ProtocolSession session) {
>>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>>> +
>>> +            public void onResponse(FutureResponse response) {
>>> +                writeResponseToClient(response, session);
>>> +                writeQueuedResponses(session);
>>> +            }
>>> +        });
>>>       }
>>>
>>>       /**
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>>> For additional commands, e-mail: server-dev-help@james.apache.org
>>>
>>
>> --
>> Eric http://about.echarles.net
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Norman Maurer <no...@googlemail.com>.
Hi Eric,

comments inside....

Am 24.12.2011 um 10:05 schrieb Eric Charles <er...@apache.org>:

> Hi Stephano,
> 
> Opening the discussion to learn more :)
> 
> - Why are you considering that 2 threads is a criteria to use standard synchronization rather than some atomic fields.
> 
If you only have a small count of concurrent threads its not slower to use synchronization as the context switching will not happen often..


> - I can understand you replace a concurrent by a non-concurrent queue. However, you now have a blocking queue. Is there an impact due to this blocking aspect?

Nope there is not as we not use the blocking methods. We could even replace it with a LinkedList.

> 
> - You defined isAsync as volatile and sometimes encapsulate access to isAsync in a synchronized block, sometime not. Why using 2 different thread-safety strategies in this class?
> 
If you only need to access a "status flag" ina concurrent way then its more cheap to just use a volatile for it. If you need to update more then one field in a "atomic" way you need synchronized. Updating a volatile in a synchronized is not a problem...


> Thx,
> 
> Eric
> 
> 
Hope it helps,
Norman


> On 21/12/11 15:47, bago@apache.org wrote:
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>> 
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).
>> 
>> Modified:
>>     james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> 
>> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>> ==============================================================================
>> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
>> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>  import java.io.InputStream;
>>  import java.io.UnsupportedEncodingException;
>>  import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>> -
>> +import java.util.Queue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>> 
>>  import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>> 
>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>> 
>> 
>>      // TODO: Should we limit the size ?
>> -    private final ConcurrentLinkedQueue<Response>  responses = new ConcurrentLinkedQueue<Response>();
>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>> +    private final Queue<Response>  responses = new LinkedBlockingQueue<Response>();
>> +    private volatile boolean isAsync = false;
>> 
>>      /**
>>       * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
>>       */
>>      public final void writeResponse(Response response, final ProtocolSession session) {
>> -        // just add the response to the queue. We will trigger the write operation later
>> -        responses.add(response);
>> -
>> -        // trigger the write
>> -        writeQueuedResponses(session);
>> +        // if we already in asynchrnous mode we simply enqueue the response
>> +        // we do this synchronously because we may have a dequeuer thread working on
>> +        // isAsync and responses.
>> +        boolean enqueued = false;
>> +        synchronized(this) {
>> +            if (isAsync == true) {
>> +                responses.offer(response);
>> +                enqueued = true;
>> +            }
>> +        }
>> +
>> +        // if we didn't enqueue then we check if the response is writable or we have to
>> +        // set us "asynchrnous" and wait for response to be ready.
>> +        if (!enqueued) {
>> +            if (isResponseWritable(response)) {
>> +                writeResponseToClient(response, session);
>> +            } else {
>> +                addDequeuerListener(response, session);
>> +                isAsync = true;
>> +            }
>> +        }
>>      }
>> 
>>      /**
>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>       * @param session
>>       */
>>      private  void writeQueuedResponses(final ProtocolSession session) {
>> -        Response queuedResponse = null;
>> 
>> -        if (write.compareAndSet(false, true)){
>> -            boolean listenerAdded = false;
>> -            // dequeue Responses until non is left
>> -            while ((queuedResponse = responses.poll()) != null) {
>> -
>> -                // check if we need to take special care of FutureResponses
>> -                if (queuedResponse instanceof FutureResponse) {
>> -                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
>> -                    if (futureResponse.isReady()) {
>> -                        // future is ready so we can write it without blocking the IO-Thread
>> -                        writeResponseToClient(queuedResponse, session);
>> -                    } else {
>> -
>> -                        // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
>> -                        futureResponse.addListener(new ResponseListener() {
>> -
>> -                            public void onResponse(FutureResponse response) {
>> -                                writeResponseToClient(response, session);
>> -                                if (write.compareAndSet(true, false)) {
>> -                                    writeQueuedResponses(session);
>> -                                }
>> -                            }
>> -                        });
>> -                        listenerAdded = true;
>> -                        // just break here as we will trigger the dequeue later
>> -                        break;
>> -                    }
>> -
>> -                } else {
>> -                    // the Response is not a FutureResponse, so just write it back the the remote peer
>> -                    writeResponseToClient(queuedResponse, session);
>> +        // dequeue Responses until non is left
>> +        while (true) {
>> +
>> +            Response queuedResponse = null;
>> +
>> +            // synchrnously we check responses and if it is empty we move back to non asynch
>> +            // behaviour
>> +            synchronized(this) {
>> +                queuedResponse = responses.poll();
>> +                if (queuedResponse == null) {
>> +                    isAsync = false;
>> +                    break;
>>                  }
>> -
>>              }
>> -            // Check if a ResponseListener was added before. If not we can allow to write
>> -            // responses again. Otherwise the writing will get triggered from the listener
>> -            if (listenerAdded == false) {
>> -                write.set(false);
>> +
>> +            // if we have something in the queue we continue writing until we
>> +            // find something asynchronous.
>> +            if (isResponseWritable(queuedResponse)) {
>> +                writeResponseToClient(queuedResponse, session);
>> +            } else {
>> +                addDequeuerListener(queuedResponse, session);
>> +                // no changes to isAsync here, because in this method we are always already async.
>> +                break;
>>              }
>>          }
>> -
>> -
>> +    }
>> +
>> +    private boolean isResponseWritable(Response response) {
>> +        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
>> +    }
>> +
>> +    private void addDequeuerListener(Response response, final ProtocolSession session) {
>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>> +
>> +            public void onResponse(FutureResponse response) {
>> +                writeResponseToClient(response, session);
>> +                writeQueuedResponses(session);
>> +            }
>> +        });
>>      }
>> 
>>      /**
>> 
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>> 
> 
> -- 
> Eric http://about.echarles.net
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

Posted by Eric Charles <er...@apache.org>.
Hi Stephano,

Opening the discussion to learn more :)

- Why are you considering that 2 threads is a criteria to use standard 
synchronization rather than some atomic fields.

- I can understand you replace a concurrent by a non-concurrent queue. 
However, you now have a blocking queue. Is there an impact due to this 
blocking aspect?

- You defined isAsync as volatile and sometimes encapsulate access to 
isAsync in a synchronized block, sometime not. Why using 2 different 
thread-safety strategies in this class?

Thx,

Eric


On 21/12/11 15:47, bago@apache.org wrote:
> Author: bago
> Date: Wed Dec 21 14:47:25 2011
> New Revision: 1221748
>
> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
> Log:
> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).
>
> Modified:
>      james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>
> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
> ==============================================================================
> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011
> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>   import java.io.InputStream;
>   import java.io.UnsupportedEncodingException;
>   import java.util.List;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.atomic.AtomicBoolean;
> -
> +import java.util.Queue;
> +import java.util.concurrent.LinkedBlockingQueue;
>
>   import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>
> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>
>
>       // TODO: Should we limit the size ?
> -    private final ConcurrentLinkedQueue<Response>  responses = new ConcurrentLinkedQueue<Response>();
> -    private final AtomicBoolean write = new AtomicBoolean(false);
> +    private final Queue<Response>  responses = new LinkedBlockingQueue<Response>();
> +    private volatile boolean isAsync = false;
>
>       /**
>        * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
>        */
>       public final void writeResponse(Response response, final ProtocolSession session) {
> -        // just add the response to the queue. We will trigger the write operation later
> -        responses.add(response);
> -
> -        // trigger the write
> -        writeQueuedResponses(session);
> +        // if we already in asynchrnous mode we simply enqueue the response
> +        // we do this synchronously because we may have a dequeuer thread working on
> +        // isAsync and responses.
> +        boolean enqueued = false;
> +        synchronized(this) {
> +            if (isAsync == true) {
> +                responses.offer(response);
> +                enqueued = true;
> +            }
> +        }
> +
> +        // if we didn't enqueue then we check if the response is writable or we have to
> +        // set us "asynchrnous" and wait for response to be ready.
> +        if (!enqueued) {
> +            if (isResponseWritable(response)) {
> +                writeResponseToClient(response, session);
> +            } else {
> +                addDequeuerListener(response, session);
> +                isAsync = true;
> +            }
> +        }
>       }
>
>       /**
> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>        * @param session
>        */
>       private  void writeQueuedResponses(final ProtocolSession session) {
> -        Response queuedResponse = null;
>
> -        if (write.compareAndSet(false, true)){
> -            boolean listenerAdded = false;
> -            // dequeue Responses until non is left
> -            while ((queuedResponse = responses.poll()) != null) {
> -
> -                // check if we need to take special care of FutureResponses
> -                if (queuedResponse instanceof FutureResponse) {
> -                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
> -                    if (futureResponse.isReady()) {
> -                        // future is ready so we can write it without blocking the IO-Thread
> -                        writeResponseToClient(queuedResponse, session);
> -                    } else {
> -
> -                        // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
> -                        futureResponse.addListener(new ResponseListener() {
> -
> -                            public void onResponse(FutureResponse response) {
> -                                writeResponseToClient(response, session);
> -                                if (write.compareAndSet(true, false)) {
> -                                    writeQueuedResponses(session);
> -                                }
> -                            }
> -                        });
> -                        listenerAdded = true;
> -                        // just break here as we will trigger the dequeue later
> -                        break;
> -                    }
> -
> -                } else {
> -                    // the Response is not a FutureResponse, so just write it back the the remote peer
> -                    writeResponseToClient(queuedResponse, session);
> +        // dequeue Responses until non is left
> +        while (true) {
> +
> +            Response queuedResponse = null;
> +
> +            // synchrnously we check responses and if it is empty we move back to non asynch
> +            // behaviour
> +            synchronized(this) {
> +                queuedResponse = responses.poll();
> +                if (queuedResponse == null) {
> +                    isAsync = false;
> +                    break;
>                   }
> -
>               }
> -            // Check if a ResponseListener was added before. If not we can allow to write
> -            // responses again. Otherwise the writing will get triggered from the listener
> -            if (listenerAdded == false) {
> -                write.set(false);
> +
> +            // if we have something in the queue we continue writing until we
> +            // find something asynchronous.
> +            if (isResponseWritable(queuedResponse)) {
> +                writeResponseToClient(queuedResponse, session);
> +            } else {
> +                addDequeuerListener(queuedResponse, session);
> +                // no changes to isAsync here, because in this method we are always already async.
> +                break;
>               }
>           }
> -
> -
> +    }
> +
> +    private boolean isResponseWritable(Response response) {
> +        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
> +    }
> +
> +    private void addDequeuerListener(Response response, final ProtocolSession session) {
> +        ((FutureResponse) response).addListener(new ResponseListener() {
> +
> +            public void onResponse(FutureResponse response) {
> +                writeResponseToClient(response, session);
> +                writeQueuedResponses(session);
> +            }
> +        });
>       }
>
>       /**
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org