You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by Norman Maurer <no...@apache.org> on 2011/12/21 20:48:38 UTC

Re: svn commit: r1221748 -/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java

This makes sense! Thanks for taking care. I will cut a new release now that this is fixed. I think I will call it 1.6.0-RC1 as we are really close ;)

Bye,
Norman


-- 
Norman Maurer


Am Mittwoch, 21. Dezember 2011 um 15:47 schrieb bago@apache.org:

> Author: bago
> Date: Wed Dec 21 14:47:25 2011
> New Revision: 1221748
> 
> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
> Log:
> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard synchronization as we only have max 2 threads competing for the queue so it doesn't make sense to use a non blocking queue. Norman, please overview, and feel free to revert if you don't like the solution (i thought it was better to simply commit instead of opening a JIRA to show you this).
> 
> Modified:
> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> 
> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
> ==============================================================================
> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (original)
> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Dec 21 14:47:25 2011
> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
> import java.io.InputStream;
> import java.io.UnsupportedEncodingException;
> import java.util.List;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.atomic.AtomicBoolean;
> -
> +import java.util.Queue;
> +import java.util.concurrent.LinkedBlockingQueue;
> 
> import org.apache.james.protocols.api.FutureResponse.ResponseListener;
> 
> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
> 
> 
> // TODO: Should we limit the size ?
> - private final ConcurrentLinkedQueue<Response> responses = new ConcurrentLinkedQueue<Response>();
> - private final AtomicBoolean write = new AtomicBoolean(false);
> + private final Queue<Response> responses = new LinkedBlockingQueue<Response>();
> + private volatile boolean isAsync = false;
> 
> /**
> * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession)
> */
> public final void writeResponse(Response response, final ProtocolSession session) {
> - // just add the response to the queue. We will trigger the write operation later
> - responses.add(response);
> - 
> - // trigger the write
> - writeQueuedResponses(session);
> + // if we already in asynchrnous mode we simply enqueue the response
> + // we do this synchronously because we may have a dequeuer thread working on
> + // isAsync and responses.
> + boolean enqueued = false;
> + synchronized(this) {
> + if (isAsync == true) {
> + responses.offer(response);
> + enqueued = true;
> + }
> + }
> + 
> + // if we didn't enqueue then we check if the response is writable or we have to 
> + // set us "asynchrnous" and wait for response to be ready.
> + if (!enqueued) {
> + if (isResponseWritable(response)) {
> + writeResponseToClient(response, session);
> + } else {
> + addDequeuerListener(response, session);
> + isAsync = true;
> + }
> + }
> }
> 
> /**
> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
> * @param session
> */
> private void writeQueuedResponses(final ProtocolSession session) {
> - Response queuedResponse = null;
> 
> - if (write.compareAndSet(false, true)){
> - boolean listenerAdded = false;
> - // dequeue Responses until non is left
> - while ((queuedResponse = responses.poll()) != null) {
> - 
> - // check if we need to take special care of FutureResponses
> - if (queuedResponse instanceof FutureResponse) {
> - FutureResponse futureResponse =(FutureResponse) queuedResponse;
> - if (futureResponse.isReady()) {
> - // future is ready so we can write it without blocking the IO-Thread
> - writeResponseToClient(queuedResponse, session);
> - } else {
> - 
> - // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread
> - futureResponse.addListener(new ResponseListener() {
> - 
> - public void onResponse(FutureResponse response) {
> - writeResponseToClient(response, session);
> - if (write.compareAndSet(true, false)) {
> - writeQueuedResponses(session);
> - }
> - }
> - });
> - listenerAdded = true;
> - // just break here as we will trigger the dequeue later
> - break;
> - }
> - 
> - } else {
> - // the Response is not a FutureResponse, so just write it back the the remote peer
> - writeResponseToClient(queuedResponse, session);
> + // dequeue Responses until non is left
> + while (true) {
> + 
> + Response queuedResponse = null;
> + 
> + // synchrnously we check responses and if it is empty we move back to non asynch
> + // behaviour
> + synchronized(this) {
> + queuedResponse = responses.poll();
> + if (queuedResponse == null) {
> + isAsync = false;
> + break;
> }
> - 
> }
> - // Check if a ResponseListener was added before. If not we can allow to write
> - // responses again. Otherwise the writing will get triggered from the listener
> - if (listenerAdded == false) {
> - write.set(false);
> +
> + // if we have something in the queue we continue writing until we
> + // find something asynchronous.
> + if (isResponseWritable(queuedResponse)) {
> + writeResponseToClient(queuedResponse, session);
> + } else {
> + addDequeuerListener(queuedResponse, session);
> + // no changes to isAsync here, because in this method we are always already async.
> + break;
> }
> }
> -
> - 
> + }
> + 
> + private boolean isResponseWritable(Response response) {
> + return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
> + }
> + 
> + private void addDequeuerListener(Response response, final ProtocolSession session) {
> + ((FutureResponse) response).addListener(new ResponseListener() {
> + 
> + public void onResponse(FutureResponse response) {
> + writeResponseToClient(response, session);
> + writeQueuedResponses(session);
> + }
> + });
> }
> 
> /**
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org (mailto:server-dev-unsubscribe@james.apache.org)
> For additional commands, e-mail: server-dev-help@james.apache.org (mailto:server-dev-help@james.apache.org)
> 
>