You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by "Filip Hanik (mailing lists)" <de...@hanik.com> on 2012/07/06 09:17:28 UTC

RE: svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net

NIO Tests:    128
BIO Tests:    128
NIO Failures      1
BIO Failures     10

I'll take a look at test failures on Monday, unless someone beats me to it. 

They are not consistent, as they don't happen when running individual tests, only when running the entire test suite.

Filip

> -----Original Message-----
> From: fhanik@apache.org [mailto:fhanik@apache.org]
> Sent: Friday, July 06, 2012 12:54 AM
> To: dev@tomcat.apache.org
> Subject: svn commit: r1358055 - in /tomcat/trunk:
> java/org/apache/catalina/connector/ java/org/apache/catalina/core/
> java/org/apache/coyote/ java/org/apache/coyote/ajp/
> java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/
> java/org/apache/tomcat/util/net/...
> 
> Author: fhanik
> Date: Fri Jul  6 06:53:52 2012
> New Revision: 1358055
> 
> URL: http://svn.apache.org/viewvc?rev=1358055&view=rev
> Log:
> implement rev 1 of async/non blocking writes
> 
> 
> Modified:
>     tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> 
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
>     tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
>     tomcat/trunk/java/org/apache/catalina/connector/Request.java
>     tomcat/trunk/java/org/apache/catalina/connector/Response.java
>     tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
>     tomcat/trunk/java/org/apache/coyote/ActionCode.java
>     tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
>     tomcat/trunk/java/org/apache/coyote/Response.java
>     tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
>     tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> 
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
>     tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
>     tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> 
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> 
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
>     tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
>     tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
>     tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
>     tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> 
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> a
>     tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> 
> Modified:
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> ctor/CoyoteAdapter.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> (original)
> +++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> Fri Jul  6 06:53:52 2012
> @@ -296,18 +296,15 @@ public class CoyoteAdapter implements Ad
>              }
> 
> 
> -            if (!request.isAsyncDispatching() && request.isAsync()) {
> -                AtomicBoolean result = new AtomicBoolean(true);
> -                req.action(ActionCode.ASYNC_DISPATCH_FOR_OPERATION,
> this);
> -                if (result.get()) {
> -                    if (status==SocketStatus.OPEN_WRITE) {
> -                        //TODO Notify write listener
> -                    } else if (status==SocketStatus.OPEN_READ) {
> -                        //TODO Notify read listener
> -                        asyncConImpl.canRead();
> -                    }
> -                    success = true;
> +            if (!request.isAsyncDispatching() && request.isAsync() &&
> request.isAsyncOperation()) {
> +                if (status == SocketStatus.OPEN_WRITE) {
> +                    // TODO Notify write listener
> +                    success = asyncConImpl.canWrite();
> +                } else if (status == SocketStatus.OPEN_READ) {
> +                    // TODO Notify read listener
> +                    success = asyncConImpl.canRead();
>                  }
> +
>              }
> 
>              if (request.isAsyncDispatching()) {
> 
> Modified:
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> ctor/CoyoteOutputStream.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> ---
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> (original)
> +++
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> Fri Jul  6 06:53:52 2012
> @@ -109,23 +109,15 @@ public class CoyoteOutputStream
>          ob.close();
>      }
> 
> -    /**
> -     * TODO SERVLET 3.1
> -     */
>      @Override
>      public boolean canWrite() {
> -        // TODO Auto-generated method stub
> -        return false;
> +        return ob.canWrite();
>      }
> 
> 
> -    /**
> -     * TODO SERVLET 3.1
> -     */
>      @Override
>      public void setWriteListener(WriteListener listener) {
> -        // TODO Auto-generated method stub
> -
> +        ob.setWriteListener(listener);
>      }
> 
> 
> 
> Modified:
> tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> ctor/OutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> (original)
> +++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> Fri Jul  6 06:53:52 2012
> @@ -23,6 +23,9 @@ import java.security.AccessController;
>  import java.security.PrivilegedActionException;
>  import java.security.PrivilegedExceptionAction;
>  import java.util.HashMap;
> +import java.util.concurrent.atomic.AtomicBoolean;
> +
> +import javax.servlet.WriteListener;
> 
>  import org.apache.catalina.Globals;
>  import org.apache.coyote.ActionCode;
> @@ -607,4 +610,25 @@ public class OutputBuffer extends Writer
>      }
> 
> 
> +    public boolean canWrite() {
> +        if (getWriteListener()==null) throw new
> IllegalStateException("not in non blocking mode.");
> +        AtomicBoolean canWrite = new AtomicBoolean(true);
> +        coyoteResponse.action(ActionCode.NB_WRITE_INTEREST, canWrite);
> +        return canWrite.get();
> +}
> +
> +
> +
> +    private volatile WriteListener listener;
> +    public void setWriteListener(WriteListener listener) {
> +        this.listener = listener;
> +        coyoteResponse.action(ActionCode.SET_WRITE_LISTENER, listener);
> +    }
> +
> +    public WriteListener getWriteListener() {
> +        return listener;
> +    }
> +
> +
> +
>  }
> 
> Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> ctor/Request.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/catalina/connector/Request.java
> (original)
> +++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Fri Jul
> 6 06:53:52 2012
> @@ -1653,6 +1653,16 @@ public class Request
>          return result.get();
>      }
> 
> +    public boolean isAsyncOperation() {
> +        if (asyncContext == null) {
> +            return false;
> +        }
> +
> +        AtomicBoolean result = new AtomicBoolean(false);
> +        coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC_OPERATION,
> result);
> +        return result.get();
> +    }
> +
>      @Override
>      public boolean isAsyncSupported() {
>          if (this.asyncSupported == null) {
> 
> Modified: tomcat/trunk/java/org/apache/catalina/connector/Response.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> ctor/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/catalina/connector/Response.java
> (original)
> +++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Fri
> Jul  6 06:53:52 2012
> @@ -153,6 +153,10 @@ public class Response
>          outputBuffer.setResponse(coyoteResponse);
>      }
> 
> +    public org.apache.coyote.Response getCoyoteResponse() {
> +        return this.coyoteResponse;
> +    }
> +
> 
>      /**
>       * Return the Context within which this Request is being processed.
> 
> Modified:
> tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/
> AsyncContextImpl.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> (original)
> +++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java Fri
> Jul  6 06:53:52 2012
> @@ -106,6 +106,8 @@ public class AsyncContextImpl implements
>      }
> 
>      public boolean canRead() throws IOException {
> +        if (request.getCoyoteRequest().getReadListener()==null) return
> false;
> +
>          ClassLoader oldCL =
> Thread.currentThread().getContextClassLoader();
>          ClassLoader newCL =
> request.getContext().getLoader().getClassLoader();
>          try {
> @@ -121,7 +123,16 @@ public class AsyncContextImpl implements
>      }
> 
>      public boolean canWrite() throws IOException {
> -        return false;
> +        if
> (request.getResponse().getCoyoteResponse().getWriteListener()==null)
> return false;
> +        ClassLoader oldCL =
> Thread.currentThread().getContextClassLoader();
> +        ClassLoader newCL =
> request.getContext().getLoader().getClassLoader();
> +        try {
> +            Thread.currentThread().setContextClassLoader(newCL);
> +
> request.getResponse().getCoyoteResponse().getWriteListener().onWritePoss
> ible();
> +        }finally {
> +            Thread.currentThread().setContextClassLoader(oldCL);
> +    }
> +        return true;
>      }
> 
>      public boolean timeout() throws IOException {
> 
> Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionC
> ode.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
> +++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Jul  6
> 06:53:52 2012
> @@ -183,6 +183,11 @@ public enum ActionCode {
>      ASYNC_IS_ASYNC,
> 
>      /**
> +     * Callback to determine if async read/write is in progress
> +     */
> +    ASYNC_IS_ASYNC_OPERATION,
> +
> +    /**
>       * Callback to determine if async dispatch is in progress
>       */
>      ASYNC_IS_STARTED,
> 
> Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncSt
> ateMachine.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul
> 6 06:53:52 2012
> @@ -149,6 +149,10 @@ public class AsyncStateMachine<S> {
>          return state.isAsync();
>      }
> 
> +    public boolean isAsyncOperation() {
> +        return state == AsyncState.READ_WRITE_OP;
> +    }
> +
>      public boolean isAsyncDispatching() {
>          return state.isDispatching();
>      }
> 
> Modified: tomcat/trunk/java/org/apache/coyote/Response.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Respons
> e.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/Response.java (original)
> +++ tomcat/trunk/java/org/apache/coyote/Response.java Fri Jul  6
> 06:53:52 2012
> @@ -21,6 +21,11 @@ import java.io.IOException;
>  import java.io.StringReader;
>  import java.util.Locale;
> 
> +import javax.servlet.ReadListener;
> +import javax.servlet.WriteListener;
> +
> +import org.apache.coyote.http11.AbstractInputBuffer;
> +import org.apache.coyote.http11.AbstractOutputBuffer;
>  import org.apache.tomcat.util.buf.ByteChunk;
>  import org.apache.tomcat.util.http.MimeHeaders;
>  import org.apache.tomcat.util.http.parser.AstMediaType;
> @@ -541,4 +546,29 @@ public final class Response {
>          }
>          return outputBuffer.getBytesWritten();
>      }
> +
> +    protected volatile WriteListener listener;
> +
> +    public WriteListener getWriteListener() {
> +        return listener;
> +}
> +
> +    public void setWriteListener(WriteListener listener) {
> +        //TODO SERVLET 3.1 is it allowed to switch from non block to
> blocking?
> +        setBlocking(listener==null);
> +        this.listener = listener;
> +    }
> +
> +    protected volatile boolean blocking = true;
> +
> +    public boolean isBlocking() {
> +        return blocking;
> +    }
> +
> +    public void setBlocking(boolean blocking) throws
> IllegalStateException {
> +        @SuppressWarnings("rawtypes")
> +        AbstractOutputBuffer buf = (AbstractOutputBuffer)outputBuffer;
> +        if (!blocking && !buf.supportsNonBlocking()) throw new
> IllegalStateException();
> +        this.blocking = blocking;
> +    }
>  }
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/Abs
> tractAjpProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> Fri Jul  6 06:53:52 2012
> @@ -455,6 +455,8 @@ public abstract class AbstractAjpProcess
>              ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncDispatching());
>          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
>              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
> +            ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncOperation());
>          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
>              ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncTimingOut());
>          } else if (actionCode == ActionCode.UPGRADE) {
> 
> Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/Ajp
> NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Fri Jul
> 6 06:53:52 2012
> @@ -252,8 +252,7 @@ public class AjpNioProcessor extends Abs
> 
>          if (actionCode == ActionCode.ASYNC_COMPLETE) {
>              if (asyncStateMachine.asyncComplete()) {
> -                ((NioEndpoint)endpoint).processSocket(this.socket,
> -                        SocketStatus.OPEN_READ, false);
> +                ((NioEndpoint)endpoint).dispatchForEvent(socket,
> SocketStatus.OPEN_READ, false);
>              }
>          } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
>              if (param == null) return;
> @@ -264,11 +263,9 @@ public class AjpNioProcessor extends Abs
>              }
>          } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
>              if (asyncStateMachine.asyncDispatch()) {
> -                ((NioEndpoint)endpoint).processSocket(this.socket,
> -                        SocketStatus.OPEN_READ, true);
> +                ((NioEndpoint)endpoint).dispatchForEvent(socket,
> SocketStatus.OPEN_READ, true);            }
>              }
>          }
> -    }
> 
> 
>      // ------------------------------------------------------ Protected
> Methods
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> AbstractHttp11Processor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> ---
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> (original)
> +++
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> Fri Jul  6 06:53:52 2012
> @@ -832,6 +832,8 @@ public abstract class AbstractHttp11Proc
>              ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncDispatching());
>          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
>              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
> +            ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncOperation());
>          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
>              ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncTimingOut());
>          } else if (actionCode == ActionCode.UPGRADE) {
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> AbstractOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> Fri Jul  6 06:53:52 2012
> @@ -546,4 +546,9 @@ public abstract class AbstractOutputBuff
>          }
>      }
> 
> +    // --------------------------------------------------------- Public
> Methods
> +
> +
> +    public abstract boolean supportsNonBlocking();
> +
>  }
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> Http11NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> Fri Jul  6 06:53:52 2012
> @@ -20,9 +20,11 @@ import java.io.IOException;
>  import java.io.InterruptedIOException;
>  import java.net.InetAddress;
>  import java.nio.channels.SelectionKey;
> +import java.util.concurrent.atomic.AtomicBoolean;
> 
>  import javax.net.ssl.SSLEngine;
>  import javax.servlet.ReadListener;
> +import javax.servlet.WriteListener;
> 
>  import org.apache.coyote.ActionCode;
>  import org.apache.coyote.RequestInfo;
> @@ -102,19 +104,7 @@ public class Http11NioProcessor extends
>       */
>      protected SocketWrapper<NioChannel> socket = null;
> 
> -    /**
> -     * TODO SERVLET 3.1
> -     */
> -    protected ReadListener readListener;
> -
> -    public ReadListener getReadListener() {
> -        return readListener;
> -    }
> -
> -    public void setReadListener(ReadListener listener) {
> -        readListener = listener;
> -    }
> -
> +    protected volatile boolean wantOnWritePossible = false;
> 
>      // --------------------------------------------------------- Public
> Methods
> 
> @@ -185,6 +175,84 @@ public class Http11NioProcessor extends
>      }
> 
> 
> +
> +
> +    @Override
> +    public SocketState asyncDispatch(SocketStatus status) {
> +        final NioEndpoint.KeyAttachment attach =
> (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> +
> +
> +        if (status == SocketStatus.OPEN_WRITE) {
> +            try {
> +                asyncStateMachine.asyncOperation();
> +                try {
> +                    if (outputBuffer.hasDataToWrite()) {
> +                        //System.out.println("Attempting data
> flush!!");
> +                        outputBuffer.flushBuffer(false);
> +                    }
> +                }catch (IOException x) {
> +                    if (log.isDebugEnabled()) log.debug("Unable to
> write async data.",x);
> +                    //TODO FIXME-- fix - so we can notify of error
> +                    return SocketState.CLOSED;
> +                }
> +                //return if we have more data to write
> +                if (isRegisteredForWrite(attach)) {
> +                    return SocketState.LONG;
> +                }
> +            }catch (IllegalStateException x) {
> +            }
> +        } else if (status == SocketStatus.OPEN_READ) {
> +            try {
> +                try {
> +                    if (inputBuffer.nbRead()>0) {
> +                        asyncStateMachine.asyncOperation();
> +                    }
> +                }catch (IOException x) {
> +                    if (log.isDebugEnabled()) log.debug("Unable to read
> async data.",x);
> +                  //TODO FIXME-- fix - so we can notify of error
> +                    return SocketState.CLOSED;
> +                }
> +                //return if we have more data to write
> +            }catch (IllegalStateException x) {
> +            }
> +        }
> +
> +        SocketState state = super.asyncDispatch(status);
> +        //return if we have more data to write
> +        if (isRegisteredForWrite(attach)) {
> +            return SocketState.LONG;
> +        } else {
> +            return state;
> +        }
> +    }
> +
> +
> +
> +    @Override
> +    public SocketState process(SocketWrapper<NioChannel> socketWrapper)
> throws IOException {
> +        SocketState state = super.process(socketWrapper);
> +        final NioEndpoint.KeyAttachment attach =
> (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> +        //return if we have more data to write
> +        if (isRegisteredForWrite(attach)) {
> +            return SocketState.LONG;
> +        } else {
> +            return state;
> +        }
> +    }
> +
> +
> +
> +
> +    protected boolean isRegisteredForWrite(KeyAttachment attach) {
> +        //return if we have more data to write
> +        if (outputBuffer.hasDataToWrite()) {
> +            attach.interestOps(SelectionKey.OP_WRITE);
> +            return true;
> +        } else {
> +            return false;
> +        }
> +    }
> +
>      @Override
>      protected void resetTimeouts() {
>          final NioEndpoint.KeyAttachment attach =
> (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> @@ -305,11 +373,14 @@ public class Http11NioProcessor extends
>      }
> 
> 
> +
> +
>      @Override
>      public void recycleInternal() {
>          socket = null;
>          comet = false;
>          sendfileData = null;
> +        wantOnWritePossible = false;
>      }
> 
> 
> @@ -492,8 +563,7 @@ public class Http11NioProcessor extends
>              }
>          } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
>              if (asyncStateMachine.asyncComplete()) {
> -
> ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
> -                        SocketStatus.OPEN_READ, true);
> +
> ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketS
> tatus.OPEN_READ, true);
>              }
>          } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
>              if (param==null) {
> @@ -508,14 +578,15 @@ public class Http11NioProcessor extends
>              attach.setTimeout(timeout);
>          } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
>              if (asyncStateMachine.asyncDispatch()) {
> -
> ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
> -                        SocketStatus.OPEN_READ, true);
> +
> ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketS
> tatus.OPEN_READ, true);
>              }
> -        } else if (actionCode ==
> ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
> -            asyncStateMachine.asyncOperation();
>          } else if (actionCode == ActionCode.SET_READ_LISTENER) {
>              ReadListener listener = (ReadListener)param;
>              request.setReadListener(listener);
> +        } else if (actionCode == ActionCode.SET_WRITE_LISTENER) {
> +            WriteListener listener = (WriteListener)param;
> +            response.setWriteListener(listener);
> +            outputBuffer.setBlocking(listener==null);
>          } else if (actionCode == ActionCode.NB_READ_INTEREST) {
>              if (socket==null ||
> socket.getSocket().getAttachment(false)==null) {
>                  return;
> @@ -524,8 +595,27 @@ public class Http11NioProcessor extends
>              if (rp.getStage() ==
> org.apache.coyote.Constants.STAGE_SERVICE) {
>                  NioEndpoint.KeyAttachment attach =
> (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
>                  attach.interestOps(attach.interestOps() |
> SelectionKey.OP_READ);
> +            } else {
> +                throw new IllegalStateException("Calling isReady
> asynchronously is illegal.");
>              }
> -
> +        } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
> +            if (socket==null ||
> socket.getSocket().getAttachment(false)==null) {
> +                return;
> +        }
> +            AtomicBoolean canWrite = (AtomicBoolean)param;
> +            RequestInfo rp = request.getRequestProcessor();
> +            if (rp.getStage() ==
> org.apache.coyote.Constants.STAGE_SERVICE) {
> +                if (outputBuffer.isWritable()) {
> +                    canWrite.set(true);
> +                } else {
> +                    canWrite.set(false);
> +                    wantOnWritePossible = true;
> +    }
> +            } else {
> +                throw new IllegalStateException("Calling canWrite
> asynchronously is illegal.");
> +            }
> +        } else if (actionCode ==
> ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
> +            asyncStateMachine.asyncOperation();
>          }
>      }
> 
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> InternalAprOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> ---
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> (original)
> +++
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> Fri Jul  6 06:53:52 2012
> @@ -86,6 +86,11 @@ public class InternalAprOutputBuffer ext
>      // --------------------------------------------------------- Public
> Methods
> 
>      @Override
> +    public boolean supportsNonBlocking() {
> +        return false;
> +    }
> +
> +    @Override
>      public void init(SocketWrapper<Long> socketWrapper,
>              AbstractEndpoint endpoint) throws IOException {
> 
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> InternalNioOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> ---
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> (original)
> +++
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> Fri Jul  6 06:53:52 2012
> @@ -21,6 +21,12 @@ import java.io.IOException;
>  import java.nio.ByteBuffer;
>  import java.nio.channels.SelectionKey;
>  import java.nio.channels.Selector;
> +import java.util.Collections;
> +import java.util.Iterator;
> +import java.util.LinkedList;
> +import java.util.concurrent.LinkedBlockingDeque;
> +import java.util.concurrent.atomic.AtomicBoolean;
> +import java.util.concurrent.atomic.AtomicInteger;
> 
>  import org.apache.coyote.OutputBuffer;
>  import org.apache.coyote.Response;
> @@ -76,9 +82,80 @@ public class InternalNioOutputBuffer ext
>       */
>      private NioSelectorPool pool;
> 
> +    /**
> +     * Flag used only for Comet requests/responses
> +     */
> +    protected volatile boolean blocking = true;
> +
> +    /**
> +     * Track if the byte buffer is flipped
> +     */
> +    protected volatile boolean flipped = false;
> +
> +    /**
> +     * For "non-blocking" writes use an external buffer
> +     */
> +    protected volatile LinkedBlockingDeque<ByteBufferHolder>
> bufferedWrite = null;
> +
> +    /**
> +     * The max size of the buffered write buffer
> +     */
> +    protected int bufferedWriteSize = 64*1024; //64k default write
> buffer
> +
> +    /**
> +     * Number of bytes last written
> +     */
> +    protected AtomicInteger lastWrite = new AtomicInteger(1);
> +
> +    protected class ByteBufferHolder {
> +        private ByteBuffer buf;
> +        private AtomicBoolean flipped;
> +        public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
> +           this.buf = buf;
> +           this.flipped = new AtomicBoolean(flipped);
> +        }
> +        public ByteBuffer getBuf() {
> +            return buf;
> +        }
> +        public boolean isFlipped() {
> +            return flipped.get();
> +        }
> +
> +        public boolean flip() {
> +            if (flipped.compareAndSet(false, true)) {
> +                buf.flip();
> +                return true;
> +            } else {
> +                return false;
> +            }
> +        }
> +
> +        public boolean hasData() {
> +            if (flipped.get()) {
> +                return buf.remaining()>0;
> +            } else {
> +                return buf.position()>0;
> +            }
> +        }
> +
> +        @Override
> +        public String toString() {
> +            StringBuilder builder = new
> StringBuilder(super.toString());
> +            builder.append("[flipped=");
> +            builder.append(isFlipped()?"true, remaining=" : "false,
> position=");
> +            builder.append(isFlipped()? buf.remaining():
> buf.position());
> +            builder.append("]");
> +            return builder.toString();
> +        }
> +
> +    }
> 
>      // --------------------------------------------------------- Public
> Methods
> 
> +    @Override
> +    public boolean supportsNonBlocking() {
> +        return true;
> +    }
> 
>      /**
>       * Flush the response.
> @@ -91,7 +168,7 @@ public class InternalNioOutputBuffer ext
> 
>          super.flush();
>          // Flush the current buffer
> -        flushBuffer();
> +        flushBuffer(isBlocking());
> 
>      }
> 
> @@ -107,6 +184,9 @@ public class InternalNioOutputBuffer ext
>              socket.getBufHandler().getWriteBuffer().clear();
>              socket = null;
>          }
> +        lastWrite.set(1);
> +        setBlocking(true);
> +        flipped = false;
>      }
> 
> 
> @@ -118,7 +198,7 @@ public class InternalNioOutputBuffer ext
>      @Override
>      public void endRequest() throws IOException {
>          super.endRequest();
> -        flushBuffer();
> +        flushBuffer(true);
>      }
> 
>      // ------------------------------------------------ HTTP/1.1 Output
> Methods
> @@ -146,8 +226,12 @@ public class InternalNioOutputBuffer ext
>       * @throws IOException
>       * TODO Fix non blocking write properly
>       */
> +    int total = 0;
>      private synchronized int writeToSocket(ByteBuffer bytebuffer,
> boolean block, boolean flip) throws IOException {
> -        if ( flip ) bytebuffer.flip();
> +        if ( flip ) {
> +            bytebuffer.flip();
> +            flipped = true;
> +        }
> 
>          int written = 0;
>          NioEndpoint.KeyAttachment att =
> (NioEndpoint.KeyAttachment)socket.getAttachment(false);
> @@ -168,7 +252,14 @@ public class InternalNioOutputBuffer ext
>          }finally {
>              if ( selector != null ) pool.put(selector);
>          }
> -        if ( block ) bytebuffer.clear(); //only clear
> +        if ( block || bytebuffer.remaining()==0) {
> +            //blocking writes must empty the buffer
> +            //and if remaining==0 then we did empty it
> +            bytebuffer.clear();
> +            flipped = false;
> +        }
> +        total+= written;
> +        //System.out.println("Successful write("+written+ " / "+total);
>          return written;
>      }
> 
> @@ -204,31 +295,48 @@ public class InternalNioOutputBuffer ext
> 
>      }
> 
> +
>      private synchronized void addToBB(byte[] buf, int offset, int
> length) throws IOException {
> -        while (length > 0) {
> -            int thisTime = length;
> -            if (socket.getBufHandler().getWriteBuffer().position() ==
> -                    socket.getBufHandler().getWriteBuffer().capacity()
> -                    ||
> socket.getBufHandler().getWriteBuffer().remaining()==0) {
> -                flushBuffer();
> -            }
> -            if (thisTime >
> socket.getBufHandler().getWriteBuffer().remaining()) {
> -                thisTime =
> socket.getBufHandler().getWriteBuffer().remaining();
> -            }
> -            socket.getBufHandler().getWriteBuffer().put(buf, offset,
> thisTime);
> +        //try to write to socket first
> +        if (length==0) return;
> +
> +        boolean dataLeft = flushBuffer(isBlocking());
> +
> +        while (!dataLeft && length>0) {
> +            int thisTime =
> transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
>              length = length - thisTime;
>              offset = offset + thisTime;
> +            writeToSocket(socket.getBufHandler().getWriteBuffer(),
> isBlocking(), true);
> +            dataLeft = flushBuffer(isBlocking());
>          }
> +
>          NioEndpoint.KeyAttachment ka =
> (NioEndpoint.KeyAttachment)socket.getAttachment(false);
>          if ( ka!= null ) ka.access();//prevent timeouts for just doing
> client writes
> +
> +        if (!isBlocking() && length>0) {
> +            //we must buffer as long as it fits
> +            //ByteBufferHolder tail = bufferedWrite.
> +            addToBuffers(buf, offset, length);
> +    }
> +    }
> +
> +    private void addToBuffers(byte[] buf, int offset, int length) {
> +        ByteBufferHolder holder = bufferedWrite.peekLast();
> +        if (holder==null || holder.isFlipped() ||
> holder.getBuf().remaining()<length) {
> +            ByteBuffer buffer =
> ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
> +            holder = new ByteBufferHolder(buffer,false);
> +            bufferedWrite.add(holder);
> +        }
> +        holder.getBuf().put(buf,offset,length);
>      }
> 
> 
>      /**
>       * Callback to write data from the buffer.
>       */
> -    private void flushBuffer() throws IOException {
> +    protected boolean flushBuffer(boolean block) throws IOException {
> 
> +        int result = 0;
>          //prevent timeout for async,
>          SelectionKey key =
> socket.getIOChannel().keyFor(socket.getPoller().getSelector());
>          if (key != null) {
> @@ -236,16 +344,61 @@ public class InternalNioOutputBuffer ext
>              attach.access();
>          }
> 
> +        boolean dataLeft = hasMoreDataToFlush();
> +
>          //write to the socket, if there is anything to write
> -        if (socket.getBufHandler().getWriteBuffer().position() > 0) {
> -            socket.getBufHandler().getWriteBuffer().flip();
> -            writeToSocket(socket.getBufHandler().getWriteBuffer(),true,
> false);
> +        if ( dataLeft ) {
> +            result =
> writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
>          }
> +
> +        dataLeft = hasMoreDataToFlush();
> +
> +        if (!dataLeft && bufferedWrite!=null) {
> +            Iterator<ByteBufferHolder> bufIter =
> bufferedWrite.iterator();
> +            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
> +                ByteBufferHolder buffer = bufIter.next();
> +                buffer.flip();
> +                while (!hasMoreDataToFlush() &&
> buffer.getBuf().remaining()>0) {
> +                    transfer(buffer.getBuf(),
> socket.getBufHandler().getWriteBuffer());
> +                    if (buffer.getBuf().remaining() == 0) {
> +                        bufIter.remove();
>      }
> +                    result =
> writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
> +                    //here we must break if we didn't finish the write
> 
> +                }
> +            }
> +        }
> +
> +        dataLeft = hasMoreDataToFlush();
> +
> +        return dataLeft;
> +    }
> +
> +    private boolean hasMoreDataToFlush() {
> +        return (flipped &&
> socket.getBufHandler().getWriteBuffer().remaining()>0) ||
> +        (!flipped && socket.getBufHandler().getWriteBuffer().position()
> > 0);
> +    }
> +
> +    private int transfer(byte[] from, int offset, int length,
> ByteBuffer to) {
> +        int max = Math.min(length, to.remaining());
> +        ByteBuffer tmp = ByteBuffer.wrap(from, offset, max);
> +        tmp.limit (tmp.position() + max);
> +        to.put (tmp);
> +        return max;
> +    }
> +
> +    private int transfer(ByteBuffer from, ByteBuffer to) {
> +        int max = Math.min(from.remaining(), to.remaining());
> +        ByteBuffer tmp = from.duplicate ();
> +        tmp.limit (tmp.position() + max);
> +        to.put (tmp);
> +        from.position(from.position() + max);
> +        return max;
> +    }
> 
> -    // ----------------------------------- OutputStreamOutputBuffer
> Inner Class
> 
> +    // ----------------------------------- OutputStreamOutputBuffer
> Inner Class
> 
>      /**
>       * This class is an output buffer which will write data to an
> output
> @@ -275,4 +428,44 @@ public class InternalNioOutputBuffer ext
>              return byteCount;
>          }
>      }
> +
> +    //----------------------------------------non blocking writes -----
> ------------
> +    public void setBlocking(boolean blocking) {
> +        this.blocking = blocking;
> +        if (blocking)
> +            bufferedWrite = null;
> +        else
> +            bufferedWrite = new
> LinkedBlockingDeque<ByteBufferHolder>();
> +}
> +
> +    public void setBufferedWriteSize(int bufferedWriteSize) {
> +        this.bufferedWriteSize = bufferedWriteSize;
> +    }
> +
> +    public boolean isBlocking() {
> +        return blocking;
> +    }
> +
> +    private boolean hasBufferedData() {
> +        boolean result = false;
> +        if (bufferedWrite!=null) {
> +            Iterator<ByteBufferHolder> iter = bufferedWrite.iterator();
> +            while (!result && iter.hasNext()) {
> +                result = iter.next().hasData();
> +            }
> +        }
> +        return result;
> +    }
> +
> +    public boolean hasDataToWrite() {
> +        return hasMoreDataToFlush() || hasBufferedData();
> +    }
> +
> +    public int getBufferedWriteSize() {
> +        return bufferedWriteSize;
> +    }
> +
> +    public boolean isWritable() {
> +        return !hasDataToWrite();
> +    }
>  }
> 
> Modified:
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> InternalOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> Fri Jul  6 06:53:52 2012
> @@ -96,6 +96,11 @@ public class InternalOutputBuffer extend
>      // --------------------------------------------------------- Public
> Methods
> 
>      @Override
> +    public boolean supportsNonBlocking() {
> +        return false;
> +    }
> +
> +    @Override
>      public void init(SocketWrapper<Socket> socketWrapper,
>              AbstractEndpoint endpoint) throws IOException {
> 
> 
> Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/Sp
> dyProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Fri Jul
> 6 06:53:52 2012
> @@ -386,6 +386,8 @@ public class SpdyProcessor extends Abstr
>              ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncDispatching());
>          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
>              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
> +            ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncOperation());
>          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
>              ((AtomicBoolean)
> param).set(asyncStateMachine.isAsyncTimingOut());
>          } else {
> 
> Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/ne
> t/NioEndpoint.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> (original)
> +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri
> Jul  6 06:53:52 2012
> @@ -716,7 +716,16 @@ public class NioEndpoint extends Abstrac
>          return true;
>      }
> 
> -    public boolean processSocket(NioChannel socket, SocketStatus
> status, boolean dispatch) {
> +    public boolean dispatchForEvent(NioChannel socket, SocketStatus
> status, boolean dispatch) {
> +        if (!dispatch) {
> +            processSocket(socket,status,dispatch);
> +        } else {
> +            socket.getPoller().add(socket, OP_CALLBACK);
> +        }
> +        return true;
> +    }
> +
> +    protected boolean processSocket(NioChannel socket, SocketStatus
> status, boolean dispatch) {
>          try {
>              KeyAttachment attachment =
> (KeyAttachment)socket.getAttachment(false);
>              if (attachment == null) {
> @@ -900,7 +909,7 @@ public class NioEndpoint extends Abstrac
>                          final KeyAttachment att = (KeyAttachment)
> key.attachment();
>                          if ( att!=null ) {
>                              //handle callback flag
> -                            if (att.getComet() && (interestOps &
> OP_CALLBACK) == OP_CALLBACK ) {
> +                            if ((interestOps & OP_CALLBACK) ==
> OP_CALLBACK ) {
>                                  att.setCometNotify(true);
>                              } else {
>                                  att.setCometNotify(false);
> @@ -910,7 +919,8 @@ public class NioEndpoint extends Abstrac
>                              //we are registering the key to start with,
> reset the fairness counter.
>                              int ops = key.interestOps() | interestOps;
>                              att.interestOps(ops);
> -                            key.interestOps(ops);
> +                            if (att.getCometNotify())
> key.interestOps(0);
> +                            else key.interestOps(ops);
>                          } else {
>                              cancel = true;
>                          }
> @@ -979,10 +989,6 @@ public class NioEndpoint extends Abstrac
>          public void cometInterest(NioChannel socket) {
>              KeyAttachment att =
> (KeyAttachment)socket.getAttachment(false);
>              add(socket,att.getCometOps());
> -            if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
> -                nextExpiration = 0; //force the check for faster
> callback
> -                selector.wakeup();
> -            }
>          }
> 
>          /**
> @@ -1001,6 +1007,9 @@ public class NioEndpoint extends Abstrac
>              PollerEvent r = eventCache.poll();
>              if ( r==null) r = new PollerEvent(socket,null,interestOps);
>              else r.reset(socket,null,interestOps);
> +            if ( (interestOps&OP_CALLBACK) == OP_CALLBACK ) {
> +                nextExpiration = 0; //force the check for faster
> callback
> +            }
>              addEvent(r);
>              if (close) {
>                  processSocket(socket, SocketStatus.STOP, false);
> @@ -1256,7 +1265,7 @@ public class NioEndpoint extends Abstrac
> 
>                                  boolean readAndWrite = sk.isReadable()
> && sk.isWritable();
>                                  reg(sk, attachment, 0);
> -                                if (readAndWrite) {
> +                                if (attachment.isAsync() &&
> readAndWrite) {
>                                      //remember the that we want to know
> about write too
> 
> attachment.interestOps(SelectionKey.OP_WRITE);
>                                  }
> @@ -1406,7 +1415,7 @@ public class NioEndpoint extends Abstrac
>              // - the selector simply timed out (suggests there isn't
> much load)
>              // - the nextExpiration time has passed
>              // - the server socket is being closed
> -            if ((keyCount > 0 || hasEvents) && (now < nextExpiration)
> && !close) {
> +            if (nextExpiration > 0 && (keyCount > 0 || hasEvents) &&
> (now < nextExpiration) && !close) {
>                  return;
>              }
>              //timeout
> @@ -1421,10 +1430,11 @@ public class NioEndpoint extends Abstrac
>                          cancelledKey(key, SocketStatus.ERROR); //we
> don't support any keys without attachments
>                      } else if ( ka.getError() ) {
>                          cancelledKey(key, SocketStatus.ERROR);//TODO
> this is not yet being used
> -                    } else if (ka.getComet() && ka.getCometNotify() ) {
> +                    } else if (ka.getCometNotify() ) {
>                          ka.setCometNotify(false);
> -                        reg(key,ka,0);//avoid multiple calls, this gets
> reregistered after invocation
> -                        //if (!processSocket(ka.getChannel(),
> SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(),
> SocketStatus.DISCONNECT);
> +                        int ops = ka.interestOps() & ~OP_CALLBACK;;
> +                        reg(key,ka,0);//avoid multiple calls, this gets
> re-registered after invocation
> +                        ka.interestOps(ops);
>                          if (!processSocket(ka.getChannel(),
> SocketStatus.OPEN_READ, true)) processSocket(ka.getChannel(),
> SocketStatus.DISCONNECT, true);
>                      } else if ((ka.interestOps()&SelectionKey.OP_READ)
> == SelectionKey.OP_READ ||
>                                (ka.interestOps()&SelectionKey.OP_WRITE)
> == SelectionKey.OP_WRITE) {
> 
> Modified:
> tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/ne
> t/SecureNioChannel.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> (original)
> +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> Fri Jul  6 06:53:52 2012
> @@ -478,7 +478,7 @@ public class SecureNioChannel extends Ni
>              int written = sc.write(src);
>              return written;
>          } else {
> -            //make sure we can handle expand, and that we only use on
> buffer
> +            //make sure we can handle expand, and that we only use one
> buffer
>              if ( (!this.isSendFile()) && (src !=
> bufHandler.getWriteBuffer()) ) throw new IllegalArgumentException("You
> can only write using the application write buffer provided by the
> handler.");
>              //are we closing or closed?
>              if ( closing || closed) throw new IOException("Channel is
> in closing state.");
> 
> Modified:
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> a
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonbl
> ocking/TestNonBlockingAPI.java?rev=1358055&r1=1358054&r2=1358055&view=di
> ff
> ========================================================================
> ======
> ---
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> a (original)
> +++
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> a Fri Jul  6 06:53:52 2012
> @@ -17,10 +17,10 @@
>  package org.apache.catalina.nonblocking;
> 
>  import java.io.IOException;
> +import java.util.Arrays;
>  import java.util.HashMap;
>  import java.util.List;
>  import java.util.Map;
> -import java.util.Random;
> 
>  import javax.servlet.AsyncContext;
>  import javax.servlet.AsyncEvent;
> @@ -28,14 +28,12 @@ import javax.servlet.AsyncListener;
>  import javax.servlet.ReadListener;
>  import javax.servlet.ServletException;
>  import javax.servlet.ServletInputStream;
> +import javax.servlet.ServletOutputStream;
>  import javax.servlet.WriteListener;
>  import javax.servlet.annotation.WebServlet;
>  import javax.servlet.http.HttpServletRequest;
>  import javax.servlet.http.HttpServletResponse;
> 
> -import org.junit.Assert;
> -import org.junit.Test;
> -
>  import org.apache.catalina.Wrapper;
>  import org.apache.catalina.core.StandardContext;
>  import org.apache.catalina.startup.BytesStreamer;
> @@ -44,9 +42,14 @@ import org.apache.catalina.startup.Tomca
>  import org.apache.catalina.startup.TomcatBaseTest;
>  import org.apache.coyote.http11.Http11NioProtocol;
>  import org.apache.tomcat.util.buf.ByteChunk;
> +import org.apache.tomcat.util.buf.ByteChunk.ByteOutputChannel;
> +import org.junit.Assert;
> +import org.junit.Test;
> 
>  public class TestNonBlockingAPI extends TomcatBaseTest {
> 
> +    public static final long bytesToDownload = 1024 * 1024 * 5;
> +
>      @Override
>      protected String getProtocol() {
>          return Http11NioProtocol.class.getName();
> @@ -58,31 +61,89 @@ public class TestNonBlockingAPI extends
>      }
> 
>      @Test
> -    public void testOne() throws Exception {
> +    public void testNonBlockingRead() throws Exception {
>          // Configure a context with digest auth and a single protected
> resource
>          Tomcat tomcat = getTomcatInstance();
>          // Must have a real docBase - just use temp
> -        StandardContext ctx = (StandardContext)
> -            tomcat.addContext("",
> System.getProperty("java.io.tmpdir"));
> +        StandardContext ctx = (StandardContext) tomcat.addContext("",
> System.getProperty("java.io.tmpdir"));
> 
> -        NBTesterServlet servlet = new NBTesterServlet();
> -        String servletName = NBTesterServlet.class.getName();
> +        NBReadServlet servlet = new NBReadServlet();
> +        String servletName = NBReadServlet.class.getName();
>          Wrapper servletWrapper = tomcat.addServlet(ctx, servletName,
> servlet);
>          ctx.addServletMapping("/", servletName);
> 
>          tomcat.start();
> 
> -        Map<String,List<String>> resHeaders= new HashMap<String,
> List<String>>();
> -        int rc = postUrl(true, new DataWriter(),"http://localhost:" +
> getPort() + "/", new ByteChunk(),resHeaders, null);
> +        Map<String, List<String>> resHeaders = new HashMap<String,
> List<String>>();
> +        int rc = postUrl(true, new DataWriter(500), "http://localhost:"
> + getPort() + "/", new ByteChunk(),
> +                resHeaders, null);
>          Assert.assertEquals(HttpServletResponse.SC_OK, rc);
>      }
> 
> +    @Test
> +    public void testNonBlockingWrite() throws Exception {
> +        String bind = "localhost";
> +        // Configure a context with digest auth and a single protected
> resource
> +        Tomcat tomcat = getTomcatInstance();
> +        // Must have a real docBase - just use temp
> +        StandardContext ctx = (StandardContext) tomcat.addContext("",
> System.getProperty("java.io.tmpdir"));
> +
> +        NBWriteServlet servlet = new NBWriteServlet();
> +        String servletName = NBWriteServlet.class.getName();
> +        Wrapper servletWrapper = tomcat.addServlet(ctx, servletName,
> servlet);
> +        ctx.addServletMapping("/", servletName);
> +        tomcat.getConnector().setProperty("socket.txBufSize", "1024");
> +        tomcat.getConnector().setProperty("address", bind);
> +
> System.out.println(tomcat.getConnector().getProperty("address"));
> +        tomcat.start();
> +
> +        Map<String, List<String>> resHeaders = new HashMap<String,
> List<String>>();
> +        ByteChunk slowReader = new ByteChunk();
> +        slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't
> work
> +        slowReader.setByteOutputChannel(new ByteOutputChannel() {
> +            long counter = 0;
> +            long delta = 0;
> +
> +            @Override
> +            public void realWriteBytes(byte[] cbuf, int off, int len)
> throws IOException {
> +                try {
> +                    if (len == 0)
> +                        return;
> +                    counter += len;
> +                    delta += len;
> +                    if (counter > bytesToDownload) {
> +                        System.out.println("ERROR Downloaded more than
> expected ERROR");
> +                    } else if (counter == bytesToDownload) {
> +                        System.out.println("Download complete(" +
> bytesToDownload + " bytes)");
> +                        // } else if (counter > (1966086)) {
> +                        // System.out.println("Download almost
> complete, missing bytes ("+counter+")");
> +                    } else if (delta > (bytesToDownload / 16)) {
> +                        System.out.println("Read " + counter + "
> bytes.");
> +                        delta = 0;
> +                        Thread.currentThread().sleep(500);
> +                    }
> +                } catch (Exception x) {
> +                    throw new IOException(x);
> +                }
> +            }
> +        });
> +        int rc = postUrl(true, new DataWriter(0), "http://" + bind +
> ":" + getPort() + "/", slowReader, resHeaders,
> +                null);
> +        slowReader.flushBuffer();
> +        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
> +    }
> 
>      public static class DataWriter implements BytesStreamer {
>          final int max = 5;
>          int count = 0;
> +        long delay = 0;
>          byte[] b = "WANTMORE".getBytes();
>          byte[] f = "FINISHED".getBytes();
> +
> +        public DataWriter(long delay) {
> +            this.delay = delay;
> +        }
> +
>          @Override
>          public int getLength() {
>              return b.length * max;
> @@ -90,7 +151,7 @@ public class TestNonBlockingAPI extends
> 
>          @Override
>          public int available() {
> -            if (count<max) {
> +            if (count < max) {
>                  return b.length;
>              } else {
>                  return 0;
> @@ -100,9 +161,14 @@ public class TestNonBlockingAPI extends
>          @Override
>          public byte[] next() {
>              if (count < max) {
> -                if (count>0) try {Thread.sleep(6000);}catch(Exception
> x){}
> +                if (count > 0)
> +                    try {
> +                        if (delay > 0)
> +                            Thread.sleep(delay);
> +                    } catch (Exception x) {
> +                    }
>                  count++;
> -                if (count<max)
> +                if (count < max)
>                      return b;
>                  else
>                      return f;
> @@ -113,12 +179,12 @@ public class TestNonBlockingAPI extends
> 
>      }
> 
> -    @WebServlet(asyncSupported=true)
> -    public static class NBTesterServlet extends TesterServlet {
> +    @WebServlet(asyncSupported = true)
> +    public static class NBReadServlet extends TesterServlet {
> 
>          @Override
>          protected void service(HttpServletRequest req,
> HttpServletResponse resp) throws ServletException, IOException {
> -            //step 1 - start async
> +            // step 1 - start async
>              AsyncContext actx = req.startAsync();
>              actx.setTimeout(Long.MAX_VALUE);
>              actx.addListener(new AsyncListener() {
> @@ -147,7 +213,7 @@ public class TestNonBlockingAPI extends
> 
>                  }
>              });
> -            //step 2 - notify on read
> +            // step 2 - notify on read
>              ServletInputStream in = req.getInputStream();
>              ReadListener rlist = new TestReadListener(actx);
>              in.setReadListener(rlist);
> @@ -155,17 +221,12 @@ public class TestNonBlockingAPI extends
>              while (in.isReady()) {
>                  rlist.onDataAvailable();
>              }
> -            //step 3 - notify that we wish to read
> -            //ServletOutputStream out = resp.getOutputStream();
> -            //out.setWriteListener(new TestWriteListener(actx));
> +            // step 3 - notify that we wish to read
> +            // ServletOutputStream out = resp.getOutputStream();
> +            // out.setWriteListener(new TestWriteListener(actx));
> 
>          }
> 
> -        @Override
> -        protected void doPost(HttpServletRequest req,
> HttpServletResponse resp) throws ServletException, IOException {
> -            doGet(req,resp);
> -        }
> -
>          private class TestReadListener implements ReadListener {
>              AsyncContext ctx;
> 
> @@ -177,9 +238,9 @@ public class TestNonBlockingAPI extends
>              public void onDataAvailable() {
>                  try {
>                      ServletInputStream in =
> ctx.getRequest().getInputStream();
> -                    int avail=0;
> +                    int avail = 0;
>                      String s = "";
> -                    while ((avail=in.dataAvailable()) > 0) {
> +                    while ((avail = in.dataAvailable()) > 0) {
>                          byte[] b = new byte[avail];
>                          in.read(b);
>                          s += new String(b);
> @@ -191,7 +252,7 @@ public class TestNonBlockingAPI extends
>                      } else {
>                          in.isReady();
>                      }
> -                }catch (Exception x) {
> +                } catch (Exception x) {
>                      x.printStackTrace();
>                      ctx.complete();
>                  }
> @@ -212,9 +273,62 @@ public class TestNonBlockingAPI extends
>              }
>          }
> 
> -        private class TestWriteListener implements WriteListener {
> +    }
> +
> +    @WebServlet(asyncSupported = true)
> +    public static class NBWriteServlet extends TesterServlet {
> 
> +        @Override
> +        protected void service(HttpServletRequest req,
> HttpServletResponse resp) throws ServletException, IOException {
> +            // step 1 - start async
> +            AsyncContext actx = req.startAsync();
> +            actx.setTimeout(Long.MAX_VALUE);
> +            actx.addListener(new AsyncListener() {
> +
> +                @Override
> +                public void onTimeout(AsyncEvent event) throws
> IOException {
> +                    System.out.println("onTimeout");
> +
> +                }
> +
> +                @Override
> +                public void onStartAsync(AsyncEvent event) throws
> IOException {
> +                    System.out.println("onStartAsync");
> +
> +                }
> +
> +                @Override
> +                public void onError(AsyncEvent event) throws
> IOException {
> +                    System.out.println("onError");
> +
> +                }
> +
> +                @Override
> +                public void onComplete(AsyncEvent event) throws
> IOException {
> +                    System.out.println("onComplete");
> +
> +                }
> +            });
> +            // step 2 - notify on read
> +            // ServletInputStream in = req.getInputStream();
> +            // ReadListener rlist = new TestReadListener(actx);
> +            // in.setReadListener(rlist);
> +            //
> +            // while (in.isReady()) {
> +            // rlist.onDataAvailable();
> +            // }
> +            // step 3 - notify that we wish to read
> +            ServletOutputStream out = resp.getOutputStream();
> +            resp.setBufferSize(200 * 1024);
> +            TestWriteListener listener = new TestWriteListener(actx);
> +            out.setWriteListener(listener);
> +            listener.onWritePossible();
> +        }
> +
> +        private class TestWriteListener implements WriteListener {
> +            long chunk = 1024 * 1024;
>              AsyncContext ctx;
> +            long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
> 
>              public TestWriteListener(AsyncContext ctx) {
>                  this.ctx = ctx;
> @@ -222,36 +336,40 @@ public class TestNonBlockingAPI extends
> 
>              @Override
>              public void onWritePossible() {
> -                // TODO Auto-generated method stub
> +                System.out.println("onWritePossible");
> +                try {
> +                    long left = Math.max(bytesToDownload, 0);
> +                    long start = System.currentTimeMillis();
> +                    long end = System.currentTimeMillis();
> +                    long before = left;
> +                    while (left > 0 &&
> ctx.getResponse().getOutputStream().canWrite()) {
> +                        byte[] b = new byte[(int) Math.min(chunk,
> bytesToDownload)];
> +                        Arrays.fill(b, (byte) 'X');
> +                        ctx.getResponse().getOutputStream().write(b);
> +                        bytesToDownload -= b.length;
> +                        left = Math.max(bytesToDownload, 0);
> +                    }
> +                    System.out
> +                            .println("Write took:" + (end - start) + "
> ms. Bytes before=" + before + " after=" + left);
> +                    // only call complete if we have emptied the buffer
> +                    if (left == 0 &&
> ctx.getResponse().getOutputStream().canWrite()) {
> +                        // it is illegal to call complete
> +                        // if there is a write in progress
> +                        ctx.complete();
> +                    }
> +                } catch (Exception x) {
> +                    x.printStackTrace();
> +                }
> 
>              }
> 
>              @Override
>              public void onError(Throwable throwable) {
> -                // TODO Auto-generated method stub
> -
> +                System.out.println("onError");
> +                throwable.printStackTrace();
>              }
> 
>          }
> 
> -
> -
> -    }
> -
> -
> -    private static class StockQuotes {
> -        public StockQuotes() {
> -            super();
> -        }
> -        Random r = new Random(System.currentTimeMillis());
> -
> -        public String getNextQuote() {
> -            return String.format("VMW: $%10.0f", r.nextDouble());
> -        }
> -
> -
> -
>      }
> -
> -
>  }
> 
> Modified:
> tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/start
> up/TomcatBaseTest.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> ========================================================================
> ======
> --- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> (original)
> +++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> Fri Jul  6 06:53:52 2012
> @@ -49,6 +49,7 @@ import org.apache.catalina.core.AprLifec
>  import org.apache.catalina.core.StandardServer;
>  import org.apache.catalina.session.StandardManager;
>  import org.apache.catalina.valves.AccessLogValve;
> +import org.apache.coyote.http11.Http11NioProtocol;
>  import org.apache.tomcat.util.buf.ByteChunk;
> 
>  /**
> @@ -141,9 +142,9 @@ public abstract class TomcatBaseTest ext
>          // Has a protocol been specified
>          String protocol = System.getProperty("tomcat.test.protocol");
> 
> -        // Use BIO by default
> +        // Use NIO by default in Tomcat 8
>          if (protocol == null) {
> -            protocol = "org.apache.coyote.http11.Http11Protocol";
> +            protocol = Http11NioProtocol.class.getName();
>          }
> 
>          return protocol;
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net

Posted by Mark Thomas <ma...@apache.org>.
On 07/07/2012 22:06, Mark Thomas wrote:
> On 07/07/2012 14:58, Mark Thomas wrote:
>> On 06/07/2012 17:35, Filip Hanik (mailing lists) wrote:
>>> Turning on Java 7 does change the test landscape.
>>> Right now, I can get the test suite to run fine on Java 6, but lots of errors on Java 7
>>
>> I've tracked down the first of these issues. Calling
>> java.nio.channels.SocketChannel.socket().setTrafficClass()
>> after
>> socketChannel.connect()
>>
>> has been called triggers a SocketException. Moving the setTrafficClass()
>> call to before connect() fixes the issue.
>>
>> Tests on Java 6 show that the setTrafficClass() was having no effect if
>> called after connect(). It looks like Java 7 has started to throw an
>> exception when the setter has no effect.
>>
>> On this basis, I' say that this particular issue is a long standing bug
>> that was masked by Java 6 and is now visible with Java 7.
>>
>> I'll do a code review and move the setTrafficClass() calls as necessary.
> 
> Having done all of this, some further research on the users list
> suggests that this is a Windows XP / Server 2003 specific issue.

Tests on Linux show that calls to setTrafficClass() after accept() have
no effect. On this basis, I believe removing the option to set the
traffic class in those cases was correct.

> That means the code I deleted in the accept() case almost certainly
> needs to be restored.

The deletion stands.

I currently see no test failures on Windows. Linux testing not yet complete.

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net

Posted by Mark Thomas <ma...@apache.org>.
On 07/07/2012 14:58, Mark Thomas wrote:
> On 06/07/2012 17:35, Filip Hanik (mailing lists) wrote:
>> Turning on Java 7 does change the test landscape.
>> Right now, I can get the test suite to run fine on Java 6, but lots of errors on Java 7
> 
> Hmm. Not good (that it worked with Java 6 but not Java 7).
> 
> 
> I've tracked down the first of these issues. Calling
> 
> java.nio.channels.SocketChannel.socket().setTrafficClass()
> 
> after
> 
> socketChannel.connect()
> 
> has been called triggers a SocketException. Moving the setTrafficClass()
> call to before connect() fixes the issue.
> 
> Tests on Java 6 show that the setTrafficClass() was having no effect if
> called after connect(). It looks like Java 7 has started to throw an
> exception when the setter has no effect.
> 
> On this basis, I' say that this particular issue is a long standing bug
> that was masked by Java 6 and is now visible with Java 7.
> 
> I'll do a code review and move the setTrafficClass() calls as necessary.

Having done all of this, some further research on the users list
suggests that this is a Windows XP / Server 2003 specific issue.

That means the code I deleted in the accept() case almost certainly
needs to be restored.

I have some ideas for how this might be handled but I want to check a
few things first. I should have something some time tomorrow.

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net

Posted by Mark Thomas <ma...@apache.org>.
On 06/07/2012 17:35, Filip Hanik (mailing lists) wrote:
> Turning on Java 7 does change the test landscape.
> Right now, I can get the test suite to run fine on Java 6, but lots of errors on Java 7

Hmm. Not good (that it worked with Java 6 but not Java 7).


I've tracked down the first of these issues. Calling

java.nio.channels.SocketChannel.socket().setTrafficClass()

after

socketChannel.connect()

has been called triggers a SocketException. Moving the setTrafficClass()
call to before connect() fixes the issue.

Tests on Java 6 show that the setTrafficClass() was having no effect if
called after connect(). It looks like Java 7 has started to throw an
exception when the setter has no effect.

On this basis, I' say that this particular issue is a long standing bug
that was masked by Java 6 and is now visible with Java 7.

I'll do a code review and move the setTrafficClass() calls as necessary.

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


RE: svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net

Posted by "Filip Hanik (mailing lists)" <de...@hanik.com>.
Turning on Java 7 does change the test landscape.
Right now, I can get the test suite to run fine on Java 6, but lots of errors on Java 7

Filip

> -----Original Message-----
> From: Filip Hanik (mailing lists) [mailto:devlists@hanik.com]
> Sent: Friday, July 06, 2012 1:17 AM
> To: 'Tomcat Developers List'
> Subject: RE: svn commit: r1358055 - in /tomcat/trunk:
> java/org/apache/catalina/connector/ java/org/apache/catalina/core/
> java/org/apache/coyote/ java/org/apache/coyote/ajp/
> java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/
> java/org/apache/tomcat/util/net
> 
> NIO Tests:    128
> BIO Tests:    128
> NIO Failures      1
> BIO Failures     10
> 
> I'll take a look at test failures on Monday, unless someone beats me to
> it.
> 
> They are not consistent, as they don't happen when running individual
> tests, only when running the entire test suite.
> 
> Filip
> 
> > -----Original Message-----
> > From: fhanik@apache.org [mailto:fhanik@apache.org]
> > Sent: Friday, July 06, 2012 12:54 AM
> > To: dev@tomcat.apache.org
> > Subject: svn commit: r1358055 - in /tomcat/trunk:
> > java/org/apache/catalina/connector/ java/org/apache/catalina/core/
> > java/org/apache/coyote/ java/org/apache/coyote/ajp/
> > java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/
> > java/org/apache/tomcat/util/net/...
> >
> > Author: fhanik
> > Date: Fri Jul  6 06:53:52 2012
> > New Revision: 1358055
> >
> > URL: http://svn.apache.org/viewvc?rev=1358055&view=rev
> > Log:
> > implement rev 1 of async/non blocking writes
> >
> >
> > Modified:
> >     tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> >
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> >     tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> >     tomcat/trunk/java/org/apache/catalina/connector/Request.java
> >     tomcat/trunk/java/org/apache/catalina/connector/Response.java
> >     tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> >     tomcat/trunk/java/org/apache/coyote/ActionCode.java
> >     tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> >     tomcat/trunk/java/org/apache/coyote/Response.java
> >     tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> >     tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> >
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> >     tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> >
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> >
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> >     tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> >     tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> >     tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> >
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a
> >     tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> >
> > Modified:
> > tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/CoyoteAdapter.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
> > Fri Jul  6 06:53:52 2012
> > @@ -296,18 +296,15 @@ public class CoyoteAdapter implements Ad
> >              }
> >
> >
> > -            if (!request.isAsyncDispatching() && request.isAsync()) {
> > -                AtomicBoolean result = new AtomicBoolean(true);
> > -                req.action(ActionCode.ASYNC_DISPATCH_FOR_OPERATION,
> > this);
> > -                if (result.get()) {
> > -                    if (status==SocketStatus.OPEN_WRITE) {
> > -                        //TODO Notify write listener
> > -                    } else if (status==SocketStatus.OPEN_READ) {
> > -                        //TODO Notify read listener
> > -                        asyncConImpl.canRead();
> > -                    }
> > -                    success = true;
> > +            if (!request.isAsyncDispatching() && request.isAsync() &&
> > request.isAsyncOperation()) {
> > +                if (status == SocketStatus.OPEN_WRITE) {
> > +                    // TODO Notify write listener
> > +                    success = asyncConImpl.canWrite();
> > +                } else if (status == SocketStatus.OPEN_READ) {
> > +                    // TODO Notify read listener
> > +                    success = asyncConImpl.canRead();
> >                  }
> > +
> >              }
> >
> >              if (request.isAsyncDispatching()) {
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> >
> ctor/CoyoteOutputStream.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
> > Fri Jul  6 06:53:52 2012
> > @@ -109,23 +109,15 @@ public class CoyoteOutputStream
> >          ob.close();
> >      }
> >
> > -    /**
> > -     * TODO SERVLET 3.1
> > -     */
> >      @Override
> >      public boolean canWrite() {
> > -        // TODO Auto-generated method stub
> > -        return false;
> > +        return ob.canWrite();
> >      }
> >
> >
> > -    /**
> > -     * TODO SERVLET 3.1
> > -     */
> >      @Override
> >      public void setWriteListener(WriteListener listener) {
> > -        // TODO Auto-generated method stub
> > -
> > +        ob.setWriteListener(listener);
> >      }
> >
> >
> >
> > Modified:
> > tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/OutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -23,6 +23,9 @@ import java.security.AccessController;
> >  import java.security.PrivilegedActionException;
> >  import java.security.PrivilegedExceptionAction;
> >  import java.util.HashMap;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +
> > +import javax.servlet.WriteListener;
> >
> >  import org.apache.catalina.Globals;
> >  import org.apache.coyote.ActionCode;
> > @@ -607,4 +610,25 @@ public class OutputBuffer extends Writer
> >      }
> >
> >
> > +    public boolean canWrite() {
> > +        if (getWriteListener()==null) throw new
> > IllegalStateException("not in non blocking mode.");
> > +        AtomicBoolean canWrite = new AtomicBoolean(true);
> > +        coyoteResponse.action(ActionCode.NB_WRITE_INTEREST,
> canWrite);
> > +        return canWrite.get();
> > +}
> > +
> > +
> > +
> > +    private volatile WriteListener listener;
> > +    public void setWriteListener(WriteListener listener) {
> > +        this.listener = listener;
> > +        coyoteResponse.action(ActionCode.SET_WRITE_LISTENER,
> listener);
> > +    }
> > +
> > +    public WriteListener getWriteListener() {
> > +        return listener;
> > +    }
> > +
> > +
> > +
> >  }
> >
> > Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/Request.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/Request.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Fri
> Jul
> > 6 06:53:52 2012
> > @@ -1653,6 +1653,16 @@ public class Request
> >          return result.get();
> >      }
> >
> > +    public boolean isAsyncOperation() {
> > +        if (asyncContext == null) {
> > +            return false;
> > +        }
> > +
> > +        AtomicBoolean result = new AtomicBoolean(false);
> > +        coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC_OPERATION,
> > result);
> > +        return result.get();
> > +    }
> > +
> >      @Override
> >      public boolean isAsyncSupported() {
> >          if (this.asyncSupported == null) {
> >
> > Modified:
> tomcat/trunk/java/org/apache/catalina/connector/Response.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/conne
> > ctor/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/connector/Response.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Fri
> > Jul  6 06:53:52 2012
> > @@ -153,6 +153,10 @@ public class Response
> >          outputBuffer.setResponse(coyoteResponse);
> >      }
> >
> > +    public org.apache.coyote.Response getCoyoteResponse() {
> > +        return this.coyoteResponse;
> > +    }
> > +
> >
> >      /**
> >       * Return the Context within which this Request is being
> processed.
> >
> > Modified:
> > tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/
> > AsyncContextImpl.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
> Fri
> > Jul  6 06:53:52 2012
> > @@ -106,6 +106,8 @@ public class AsyncContextImpl implements
> >      }
> >
> >      public boolean canRead() throws IOException {
> > +        if (request.getCoyoteRequest().getReadListener()==null)
> return
> > false;
> > +
> >          ClassLoader oldCL =
> > Thread.currentThread().getContextClassLoader();
> >          ClassLoader newCL =
> > request.getContext().getLoader().getClassLoader();
> >          try {
> > @@ -121,7 +123,16 @@ public class AsyncContextImpl implements
> >      }
> >
> >      public boolean canWrite() throws IOException {
> > -        return false;
> > +        if
> > (request.getResponse().getCoyoteResponse().getWriteListener()==null)
> > return false;
> > +        ClassLoader oldCL =
> > Thread.currentThread().getContextClassLoader();
> > +        ClassLoader newCL =
> > request.getContext().getLoader().getClassLoader();
> > +        try {
> > +            Thread.currentThread().setContextClassLoader(newCL);
> > +
> >
> request.getResponse().getCoyoteResponse().getWriteListener().onWritePoss
> > ible();
> > +        }finally {
> > +            Thread.currentThread().setContextClassLoader(oldCL);
> > +    }
> > +        return true;
> >      }
> >
> >      public boolean timeout() throws IOException {
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionC
> > ode.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
> > +++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Jul  6
> > 06:53:52 2012
> > @@ -183,6 +183,11 @@ public enum ActionCode {
> >      ASYNC_IS_ASYNC,
> >
> >      /**
> > +     * Callback to determine if async read/write is in progress
> > +     */
> > +    ASYNC_IS_ASYNC_OPERATION,
> > +
> > +    /**
> >       * Callback to determine if async dispatch is in progress
> >       */
> >      ASYNC_IS_STARTED,
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncSt
> > ateMachine.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul
> > 6 06:53:52 2012
> > @@ -149,6 +149,10 @@ public class AsyncStateMachine<S> {
> >          return state.isAsync();
> >      }
> >
> > +    public boolean isAsyncOperation() {
> > +        return state == AsyncState.READ_WRITE_OP;
> > +    }
> > +
> >      public boolean isAsyncDispatching() {
> >          return state.isDispatching();
> >      }
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/Response.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Respons
> > e.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/Response.java (original)
> > +++ tomcat/trunk/java/org/apache/coyote/Response.java Fri Jul  6
> > 06:53:52 2012
> > @@ -21,6 +21,11 @@ import java.io.IOException;
> >  import java.io.StringReader;
> >  import java.util.Locale;
> >
> > +import javax.servlet.ReadListener;
> > +import javax.servlet.WriteListener;
> > +
> > +import org.apache.coyote.http11.AbstractInputBuffer;
> > +import org.apache.coyote.http11.AbstractOutputBuffer;
> >  import org.apache.tomcat.util.buf.ByteChunk;
> >  import org.apache.tomcat.util.http.MimeHeaders;
> >  import org.apache.tomcat.util.http.parser.AstMediaType;
> > @@ -541,4 +546,29 @@ public final class Response {
> >          }
> >          return outputBuffer.getBytesWritten();
> >      }
> > +
> > +    protected volatile WriteListener listener;
> > +
> > +    public WriteListener getWriteListener() {
> > +        return listener;
> > +}
> > +
> > +    public void setWriteListener(WriteListener listener) {
> > +        //TODO SERVLET 3.1 is it allowed to switch from non block to
> > blocking?
> > +        setBlocking(listener==null);
> > +        this.listener = listener;
> > +    }
> > +
> > +    protected volatile boolean blocking = true;
> > +
> > +    public boolean isBlocking() {
> > +        return blocking;
> > +    }
> > +
> > +    public void setBlocking(boolean blocking) throws
> > IllegalStateException {
> > +        @SuppressWarnings("rawtypes")
> > +        AbstractOutputBuffer buf =
> (AbstractOutputBuffer)outputBuffer;
> > +        if (!blocking && !buf.supportsNonBlocking()) throw new
> > IllegalStateException();
> > +        this.blocking = blocking;
> > +    }
> >  }
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/Abs
> > tractAjpProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
> > Fri Jul  6 06:53:52 2012
> > @@ -455,6 +455,8 @@ public abstract class AbstractAjpProcess
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncDispatching());
> >          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
> >              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> > +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION)
> {
> > +            ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncOperation());
> >          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncTimingOut());
> >          } else if (actionCode == ActionCode.UPGRADE) {
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/Ajp
> > NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Fri
> Jul
> > 6 06:53:52 2012
> > @@ -252,8 +252,7 @@ public class AjpNioProcessor extends Abs
> >
> >          if (actionCode == ActionCode.ASYNC_COMPLETE) {
> >              if (asyncStateMachine.asyncComplete()) {
> > -                ((NioEndpoint)endpoint).processSocket(this.socket,
> > -                        SocketStatus.OPEN_READ, false);
> > +                ((NioEndpoint)endpoint).dispatchForEvent(socket,
> > SocketStatus.OPEN_READ, false);
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
> >              if (param == null) return;
> > @@ -264,11 +263,9 @@ public class AjpNioProcessor extends Abs
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
> >              if (asyncStateMachine.asyncDispatch()) {
> > -                ((NioEndpoint)endpoint).processSocket(this.socket,
> > -                        SocketStatus.OPEN_READ, true);
> > +                ((NioEndpoint)endpoint).dispatchForEvent(socket,
> > SocketStatus.OPEN_READ, true);            }
> >              }
> >          }
> > -    }
> >
> >
> >      // ------------------------------------------------------
> Protected
> > Methods
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> >
> AbstractHttp11Processor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
> > Fri Jul  6 06:53:52 2012
> > @@ -832,6 +832,8 @@ public abstract class AbstractHttp11Proc
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncDispatching());
> >          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
> >              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> > +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION)
> {
> > +            ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncOperation());
> >          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncTimingOut());
> >          } else if (actionCode == ActionCode.UPGRADE) {
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> > AbstractOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> > (original)
> > +++
> tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -546,4 +546,9 @@ public abstract class AbstractOutputBuff
> >          }
> >      }
> >
> > +    // ---------------------------------------------------------
> Public
> > Methods
> > +
> > +
> > +    public abstract boolean supportsNonBlocking();
> > +
> >  }
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> > Http11NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
> > Fri Jul  6 06:53:52 2012
> > @@ -20,9 +20,11 @@ import java.io.IOException;
> >  import java.io.InterruptedIOException;
> >  import java.net.InetAddress;
> >  import java.nio.channels.SelectionKey;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> >
> >  import javax.net.ssl.SSLEngine;
> >  import javax.servlet.ReadListener;
> > +import javax.servlet.WriteListener;
> >
> >  import org.apache.coyote.ActionCode;
> >  import org.apache.coyote.RequestInfo;
> > @@ -102,19 +104,7 @@ public class Http11NioProcessor extends
> >       */
> >      protected SocketWrapper<NioChannel> socket = null;
> >
> > -    /**
> > -     * TODO SERVLET 3.1
> > -     */
> > -    protected ReadListener readListener;
> > -
> > -    public ReadListener getReadListener() {
> > -        return readListener;
> > -    }
> > -
> > -    public void setReadListener(ReadListener listener) {
> > -        readListener = listener;
> > -    }
> > -
> > +    protected volatile boolean wantOnWritePossible = false;
> >
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> > @@ -185,6 +175,84 @@ public class Http11NioProcessor extends
> >      }
> >
> >
> > +
> > +
> > +    @Override
> > +    public SocketState asyncDispatch(SocketStatus status) {
> > +        final NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> > +
> > +
> > +        if (status == SocketStatus.OPEN_WRITE) {
> > +            try {
> > +                asyncStateMachine.asyncOperation();
> > +                try {
> > +                    if (outputBuffer.hasDataToWrite()) {
> > +                        //System.out.println("Attempting data
> > flush!!");
> > +                        outputBuffer.flushBuffer(false);
> > +                    }
> > +                }catch (IOException x) {
> > +                    if (log.isDebugEnabled()) log.debug("Unable to
> > write async data.",x);
> > +                    //TODO FIXME-- fix - so we can notify of error
> > +                    return SocketState.CLOSED;
> > +                }
> > +                //return if we have more data to write
> > +                if (isRegisteredForWrite(attach)) {
> > +                    return SocketState.LONG;
> > +                }
> > +            }catch (IllegalStateException x) {
> > +            }
> > +        } else if (status == SocketStatus.OPEN_READ) {
> > +            try {
> > +                try {
> > +                    if (inputBuffer.nbRead()>0) {
> > +                        asyncStateMachine.asyncOperation();
> > +                    }
> > +                }catch (IOException x) {
> > +                    if (log.isDebugEnabled()) log.debug("Unable to
> read
> > async data.",x);
> > +                  //TODO FIXME-- fix - so we can notify of error
> > +                    return SocketState.CLOSED;
> > +                }
> > +                //return if we have more data to write
> > +            }catch (IllegalStateException x) {
> > +            }
> > +        }
> > +
> > +        SocketState state = super.asyncDispatch(status);
> > +        //return if we have more data to write
> > +        if (isRegisteredForWrite(attach)) {
> > +            return SocketState.LONG;
> > +        } else {
> > +            return state;
> > +        }
> > +    }
> > +
> > +
> > +
> > +    @Override
> > +    public SocketState process(SocketWrapper<NioChannel>
> socketWrapper)
> > throws IOException {
> > +        SocketState state = super.process(socketWrapper);
> > +        final NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> > +        //return if we have more data to write
> > +        if (isRegisteredForWrite(attach)) {
> > +            return SocketState.LONG;
> > +        } else {
> > +            return state;
> > +        }
> > +    }
> > +
> > +
> > +
> > +
> > +    protected boolean isRegisteredForWrite(KeyAttachment attach) {
> > +        //return if we have more data to write
> > +        if (outputBuffer.hasDataToWrite()) {
> > +            attach.interestOps(SelectionKey.OP_WRITE);
> > +            return true;
> > +        } else {
> > +            return false;
> > +        }
> > +    }
> > +
> >      @Override
> >      protected void resetTimeouts() {
> >          final NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> > @@ -305,11 +373,14 @@ public class Http11NioProcessor extends
> >      }
> >
> >
> > +
> > +
> >      @Override
> >      public void recycleInternal() {
> >          socket = null;
> >          comet = false;
> >          sendfileData = null;
> > +        wantOnWritePossible = false;
> >      }
> >
> >
> > @@ -492,8 +563,7 @@ public class Http11NioProcessor extends
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
> >              if (asyncStateMachine.asyncComplete()) {
> > -
> > ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
> > -                        SocketStatus.OPEN_READ, true);
> > +
> >
> ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketS
> > tatus.OPEN_READ, true);
> >              }
> >          } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
> >              if (param==null) {
> > @@ -508,14 +578,15 @@ public class Http11NioProcessor extends
> >              attach.setTimeout(timeout);
> >          } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
> >              if (asyncStateMachine.asyncDispatch()) {
> > -
> > ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
> > -                        SocketStatus.OPEN_READ, true);
> > +
> >
> ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketS
> > tatus.OPEN_READ, true);
> >              }
> > -        } else if (actionCode ==
> > ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
> > -            asyncStateMachine.asyncOperation();
> >          } else if (actionCode == ActionCode.SET_READ_LISTENER) {
> >              ReadListener listener = (ReadListener)param;
> >              request.setReadListener(listener);
> > +        } else if (actionCode == ActionCode.SET_WRITE_LISTENER) {
> > +            WriteListener listener = (WriteListener)param;
> > +            response.setWriteListener(listener);
> > +            outputBuffer.setBlocking(listener==null);
> >          } else if (actionCode == ActionCode.NB_READ_INTEREST) {
> >              if (socket==null ||
> > socket.getSocket().getAttachment(false)==null) {
> >                  return;
> > @@ -524,8 +595,27 @@ public class Http11NioProcessor extends
> >              if (rp.getStage() ==
> > org.apache.coyote.Constants.STAGE_SERVICE) {
> >                  NioEndpoint.KeyAttachment attach =
> > (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
> >                  attach.interestOps(attach.interestOps() |
> > SelectionKey.OP_READ);
> > +            } else {
> > +                throw new IllegalStateException("Calling isReady
> > asynchronously is illegal.");
> >              }
> > -
> > +        } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
> > +            if (socket==null ||
> > socket.getSocket().getAttachment(false)==null) {
> > +                return;
> > +        }
> > +            AtomicBoolean canWrite = (AtomicBoolean)param;
> > +            RequestInfo rp = request.getRequestProcessor();
> > +            if (rp.getStage() ==
> > org.apache.coyote.Constants.STAGE_SERVICE) {
> > +                if (outputBuffer.isWritable()) {
> > +                    canWrite.set(true);
> > +                } else {
> > +                    canWrite.set(false);
> > +                    wantOnWritePossible = true;
> > +    }
> > +            } else {
> > +                throw new IllegalStateException("Calling canWrite
> > asynchronously is illegal.");
> > +            }
> > +        } else if (actionCode ==
> > ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
> > +            asyncStateMachine.asyncOperation();
> >          }
> >      }
> >
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> >
> InternalAprOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -86,6 +86,11 @@ public class InternalAprOutputBuffer ext
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> >      @Override
> > +    public boolean supportsNonBlocking() {
> > +        return false;
> > +    }
> > +
> > +    @Override
> >      public void init(SocketWrapper<Long> socketWrapper,
> >              AbstractEndpoint endpoint) throws IOException {
> >
> >
> > Modified:
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> >
> InternalNioOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> > (original)
> > +++
> >
> tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -21,6 +21,12 @@ import java.io.IOException;
> >  import java.nio.ByteBuffer;
> >  import java.nio.channels.SelectionKey;
> >  import java.nio.channels.Selector;
> > +import java.util.Collections;
> > +import java.util.Iterator;
> > +import java.util.LinkedList;
> > +import java.util.concurrent.LinkedBlockingDeque;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +import java.util.concurrent.atomic.AtomicInteger;
> >
> >  import org.apache.coyote.OutputBuffer;
> >  import org.apache.coyote.Response;
> > @@ -76,9 +82,80 @@ public class InternalNioOutputBuffer ext
> >       */
> >      private NioSelectorPool pool;
> >
> > +    /**
> > +     * Flag used only for Comet requests/responses
> > +     */
> > +    protected volatile boolean blocking = true;
> > +
> > +    /**
> > +     * Track if the byte buffer is flipped
> > +     */
> > +    protected volatile boolean flipped = false;
> > +
> > +    /**
> > +     * For "non-blocking" writes use an external buffer
> > +     */
> > +    protected volatile LinkedBlockingDeque<ByteBufferHolder>
> > bufferedWrite = null;
> > +
> > +    /**
> > +     * The max size of the buffered write buffer
> > +     */
> > +    protected int bufferedWriteSize = 64*1024; //64k default write
> > buffer
> > +
> > +    /**
> > +     * Number of bytes last written
> > +     */
> > +    protected AtomicInteger lastWrite = new AtomicInteger(1);
> > +
> > +    protected class ByteBufferHolder {
> > +        private ByteBuffer buf;
> > +        private AtomicBoolean flipped;
> > +        public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
> > +           this.buf = buf;
> > +           this.flipped = new AtomicBoolean(flipped);
> > +        }
> > +        public ByteBuffer getBuf() {
> > +            return buf;
> > +        }
> > +        public boolean isFlipped() {
> > +            return flipped.get();
> > +        }
> > +
> > +        public boolean flip() {
> > +            if (flipped.compareAndSet(false, true)) {
> > +                buf.flip();
> > +                return true;
> > +            } else {
> > +                return false;
> > +            }
> > +        }
> > +
> > +        public boolean hasData() {
> > +            if (flipped.get()) {
> > +                return buf.remaining()>0;
> > +            } else {
> > +                return buf.position()>0;
> > +            }
> > +        }
> > +
> > +        @Override
> > +        public String toString() {
> > +            StringBuilder builder = new
> > StringBuilder(super.toString());
> > +            builder.append("[flipped=");
> > +            builder.append(isFlipped()?"true, remaining=" : "false,
> > position=");
> > +            builder.append(isFlipped()? buf.remaining():
> > buf.position());
> > +            builder.append("]");
> > +            return builder.toString();
> > +        }
> > +
> > +    }
> >
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> > +    @Override
> > +    public boolean supportsNonBlocking() {
> > +        return true;
> > +    }
> >
> >      /**
> >       * Flush the response.
> > @@ -91,7 +168,7 @@ public class InternalNioOutputBuffer ext
> >
> >          super.flush();
> >          // Flush the current buffer
> > -        flushBuffer();
> > +        flushBuffer(isBlocking());
> >
> >      }
> >
> > @@ -107,6 +184,9 @@ public class InternalNioOutputBuffer ext
> >              socket.getBufHandler().getWriteBuffer().clear();
> >              socket = null;
> >          }
> > +        lastWrite.set(1);
> > +        setBlocking(true);
> > +        flipped = false;
> >      }
> >
> >
> > @@ -118,7 +198,7 @@ public class InternalNioOutputBuffer ext
> >      @Override
> >      public void endRequest() throws IOException {
> >          super.endRequest();
> > -        flushBuffer();
> > +        flushBuffer(true);
> >      }
> >
> >      // ------------------------------------------------ HTTP/1.1
> Output
> > Methods
> > @@ -146,8 +226,12 @@ public class InternalNioOutputBuffer ext
> >       * @throws IOException
> >       * TODO Fix non blocking write properly
> >       */
> > +    int total = 0;
> >      private synchronized int writeToSocket(ByteBuffer bytebuffer,
> > boolean block, boolean flip) throws IOException {
> > -        if ( flip ) bytebuffer.flip();
> > +        if ( flip ) {
> > +            bytebuffer.flip();
> > +            flipped = true;
> > +        }
> >
> >          int written = 0;
> >          NioEndpoint.KeyAttachment att =
> > (NioEndpoint.KeyAttachment)socket.getAttachment(false);
> > @@ -168,7 +252,14 @@ public class InternalNioOutputBuffer ext
> >          }finally {
> >              if ( selector != null ) pool.put(selector);
> >          }
> > -        if ( block ) bytebuffer.clear(); //only clear
> > +        if ( block || bytebuffer.remaining()==0) {
> > +            //blocking writes must empty the buffer
> > +            //and if remaining==0 then we did empty it
> > +            bytebuffer.clear();
> > +            flipped = false;
> > +        }
> > +        total+= written;
> > +        //System.out.println("Successful write("+written+ " /
> "+total);
> >          return written;
> >      }
> >
> > @@ -204,31 +295,48 @@ public class InternalNioOutputBuffer ext
> >
> >      }
> >
> > +
> >      private synchronized void addToBB(byte[] buf, int offset, int
> > length) throws IOException {
> > -        while (length > 0) {
> > -            int thisTime = length;
> > -            if (socket.getBufHandler().getWriteBuffer().position() ==
> > -
> socket.getBufHandler().getWriteBuffer().capacity()
> > -                    ||
> > socket.getBufHandler().getWriteBuffer().remaining()==0) {
> > -                flushBuffer();
> > -            }
> > -            if (thisTime >
> > socket.getBufHandler().getWriteBuffer().remaining()) {
> > -                thisTime =
> > socket.getBufHandler().getWriteBuffer().remaining();
> > -            }
> > -            socket.getBufHandler().getWriteBuffer().put(buf, offset,
> > thisTime);
> > +        //try to write to socket first
> > +        if (length==0) return;
> > +
> > +        boolean dataLeft = flushBuffer(isBlocking());
> > +
> > +        while (!dataLeft && length>0) {
> > +            int thisTime =
> > transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
> >              length = length - thisTime;
> >              offset = offset + thisTime;
> > +            writeToSocket(socket.getBufHandler().getWriteBuffer(),
> > isBlocking(), true);
> > +            dataLeft = flushBuffer(isBlocking());
> >          }
> > +
> >          NioEndpoint.KeyAttachment ka =
> > (NioEndpoint.KeyAttachment)socket.getAttachment(false);
> >          if ( ka!= null ) ka.access();//prevent timeouts for just
> doing
> > client writes
> > +
> > +        if (!isBlocking() && length>0) {
> > +            //we must buffer as long as it fits
> > +            //ByteBufferHolder tail = bufferedWrite.
> > +            addToBuffers(buf, offset, length);
> > +    }
> > +    }
> > +
> > +    private void addToBuffers(byte[] buf, int offset, int length) {
> > +        ByteBufferHolder holder = bufferedWrite.peekLast();
> > +        if (holder==null || holder.isFlipped() ||
> > holder.getBuf().remaining()<length) {
> > +            ByteBuffer buffer =
> > ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
> > +            holder = new ByteBufferHolder(buffer,false);
> > +            bufferedWrite.add(holder);
> > +        }
> > +        holder.getBuf().put(buf,offset,length);
> >      }
> >
> >
> >      /**
> >       * Callback to write data from the buffer.
> >       */
> > -    private void flushBuffer() throws IOException {
> > +    protected boolean flushBuffer(boolean block) throws IOException {
> >
> > +        int result = 0;
> >          //prevent timeout for async,
> >          SelectionKey key =
> > socket.getIOChannel().keyFor(socket.getPoller().getSelector());
> >          if (key != null) {
> > @@ -236,16 +344,61 @@ public class InternalNioOutputBuffer ext
> >              attach.access();
> >          }
> >
> > +        boolean dataLeft = hasMoreDataToFlush();
> > +
> >          //write to the socket, if there is anything to write
> > -        if (socket.getBufHandler().getWriteBuffer().position() > 0) {
> > -            socket.getBufHandler().getWriteBuffer().flip();
> > -
> writeToSocket(socket.getBufHandler().getWriteBuffer(),true,
> > false);
> > +        if ( dataLeft ) {
> > +            result =
> > writeToSocket(socket.getBufHandler().getWriteBuffer(),block,
> !flipped);
> >          }
> > +
> > +        dataLeft = hasMoreDataToFlush();
> > +
> > +        if (!dataLeft && bufferedWrite!=null) {
> > +            Iterator<ByteBufferHolder> bufIter =
> > bufferedWrite.iterator();
> > +            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
> > +                ByteBufferHolder buffer = bufIter.next();
> > +                buffer.flip();
> > +                while (!hasMoreDataToFlush() &&
> > buffer.getBuf().remaining()>0) {
> > +                    transfer(buffer.getBuf(),
> > socket.getBufHandler().getWriteBuffer());
> > +                    if (buffer.getBuf().remaining() == 0) {
> > +                        bufIter.remove();
> >      }
> > +                    result =
> > writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
> > +                    //here we must break if we didn't finish the
> write
> >
> > +                }
> > +            }
> > +        }
> > +
> > +        dataLeft = hasMoreDataToFlush();
> > +
> > +        return dataLeft;
> > +    }
> > +
> > +    private boolean hasMoreDataToFlush() {
> > +        return (flipped &&
> > socket.getBufHandler().getWriteBuffer().remaining()>0) ||
> > +        (!flipped &&
> socket.getBufHandler().getWriteBuffer().position()
> > > 0);
> > +    }
> > +
> > +    private int transfer(byte[] from, int offset, int length,
> > ByteBuffer to) {
> > +        int max = Math.min(length, to.remaining());
> > +        ByteBuffer tmp = ByteBuffer.wrap(from, offset, max);
> > +        tmp.limit (tmp.position() + max);
> > +        to.put (tmp);
> > +        return max;
> > +    }
> > +
> > +    private int transfer(ByteBuffer from, ByteBuffer to) {
> > +        int max = Math.min(from.remaining(), to.remaining());
> > +        ByteBuffer tmp = from.duplicate ();
> > +        tmp.limit (tmp.position() + max);
> > +        to.put (tmp);
> > +        from.position(from.position() + max);
> > +        return max;
> > +    }
> >
> > -    // ----------------------------------- OutputStreamOutputBuffer
> > Inner Class
> >
> > +    // ----------------------------------- OutputStreamOutputBuffer
> > Inner Class
> >
> >      /**
> >       * This class is an output buffer which will write data to an
> > output
> > @@ -275,4 +428,44 @@ public class InternalNioOutputBuffer ext
> >              return byteCount;
> >          }
> >      }
> > +
> > +    //----------------------------------------non blocking writes ---
> --
> > ------------
> > +    public void setBlocking(boolean blocking) {
> > +        this.blocking = blocking;
> > +        if (blocking)
> > +            bufferedWrite = null;
> > +        else
> > +            bufferedWrite = new
> > LinkedBlockingDeque<ByteBufferHolder>();
> > +}
> > +
> > +    public void setBufferedWriteSize(int bufferedWriteSize) {
> > +        this.bufferedWriteSize = bufferedWriteSize;
> > +    }
> > +
> > +    public boolean isBlocking() {
> > +        return blocking;
> > +    }
> > +
> > +    private boolean hasBufferedData() {
> > +        boolean result = false;
> > +        if (bufferedWrite!=null) {
> > +            Iterator<ByteBufferHolder> iter =
> bufferedWrite.iterator();
> > +            while (!result && iter.hasNext()) {
> > +                result = iter.next().hasData();
> > +            }
> > +        }
> > +        return result;
> > +    }
> > +
> > +    public boolean hasDataToWrite() {
> > +        return hasMoreDataToFlush() || hasBufferedData();
> > +    }
> > +
> > +    public int getBufferedWriteSize() {
> > +        return bufferedWriteSize;
> > +    }
> > +
> > +    public boolean isWritable() {
> > +        return !hasDataToWrite();
> > +    }
> >  }
> >
> > Modified:
> > tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/
> > InternalOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > ---
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> > (original)
> > +++
> tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
> > Fri Jul  6 06:53:52 2012
> > @@ -96,6 +96,11 @@ public class InternalOutputBuffer extend
> >      // ---------------------------------------------------------
> Public
> > Methods
> >
> >      @Override
> > +    public boolean supportsNonBlocking() {
> > +        return false;
> > +    }
> > +
> > +    @Override
> >      public void init(SocketWrapper<Socket> socketWrapper,
> >              AbstractEndpoint endpoint) throws IOException {
> >
> >
> > Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/Sp
> > dyProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Fri
> Jul
> > 6 06:53:52 2012
> > @@ -386,6 +386,8 @@ public class SpdyProcessor extends Abstr
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncDispatching());
> >          } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
> >              ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
> > +        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION)
> {
> > +            ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncOperation());
> >          } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
> >              ((AtomicBoolean)
> > param).set(asyncStateMachine.isAsyncTimingOut());
> >          } else {
> >
> > Modified:
> tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/ne
> > t/NioEndpoint.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri
> > Jul  6 06:53:52 2012
> > @@ -716,7 +716,16 @@ public class NioEndpoint extends Abstrac
> >          return true;
> >      }
> >
> > -    public boolean processSocket(NioChannel socket, SocketStatus
> > status, boolean dispatch) {
> > +    public boolean dispatchForEvent(NioChannel socket, SocketStatus
> > status, boolean dispatch) {
> > +        if (!dispatch) {
> > +            processSocket(socket,status,dispatch);
> > +        } else {
> > +            socket.getPoller().add(socket, OP_CALLBACK);
> > +        }
> > +        return true;
> > +    }
> > +
> > +    protected boolean processSocket(NioChannel socket, SocketStatus
> > status, boolean dispatch) {
> >          try {
> >              KeyAttachment attachment =
> > (KeyAttachment)socket.getAttachment(false);
> >              if (attachment == null) {
> > @@ -900,7 +909,7 @@ public class NioEndpoint extends Abstrac
> >                          final KeyAttachment att = (KeyAttachment)
> > key.attachment();
> >                          if ( att!=null ) {
> >                              //handle callback flag
> > -                            if (att.getComet() && (interestOps &
> > OP_CALLBACK) == OP_CALLBACK ) {
> > +                            if ((interestOps & OP_CALLBACK) ==
> > OP_CALLBACK ) {
> >                                  att.setCometNotify(true);
> >                              } else {
> >                                  att.setCometNotify(false);
> > @@ -910,7 +919,8 @@ public class NioEndpoint extends Abstrac
> >                              //we are registering the key to start
> with,
> > reset the fairness counter.
> >                              int ops = key.interestOps() |
> interestOps;
> >                              att.interestOps(ops);
> > -                            key.interestOps(ops);
> > +                            if (att.getCometNotify())
> > key.interestOps(0);
> > +                            else key.interestOps(ops);
> >                          } else {
> >                              cancel = true;
> >                          }
> > @@ -979,10 +989,6 @@ public class NioEndpoint extends Abstrac
> >          public void cometInterest(NioChannel socket) {
> >              KeyAttachment att =
> > (KeyAttachment)socket.getAttachment(false);
> >              add(socket,att.getCometOps());
> > -            if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
> > -                nextExpiration = 0; //force the check for faster
> > callback
> > -                selector.wakeup();
> > -            }
> >          }
> >
> >          /**
> > @@ -1001,6 +1007,9 @@ public class NioEndpoint extends Abstrac
> >              PollerEvent r = eventCache.poll();
> >              if ( r==null) r = new
> PollerEvent(socket,null,interestOps);
> >              else r.reset(socket,null,interestOps);
> > +            if ( (interestOps&OP_CALLBACK) == OP_CALLBACK ) {
> > +                nextExpiration = 0; //force the check for faster
> > callback
> > +            }
> >              addEvent(r);
> >              if (close) {
> >                  processSocket(socket, SocketStatus.STOP, false);
> > @@ -1256,7 +1265,7 @@ public class NioEndpoint extends Abstrac
> >
> >                                  boolean readAndWrite =
> sk.isReadable()
> > && sk.isWritable();
> >                                  reg(sk, attachment, 0);
> > -                                if (readAndWrite) {
> > +                                if (attachment.isAsync() &&
> > readAndWrite) {
> >                                      //remember the that we want to
> know
> > about write too
> >
> > attachment.interestOps(SelectionKey.OP_WRITE);
> >                                  }
> > @@ -1406,7 +1415,7 @@ public class NioEndpoint extends Abstrac
> >              // - the selector simply timed out (suggests there isn't
> > much load)
> >              // - the nextExpiration time has passed
> >              // - the server socket is being closed
> > -            if ((keyCount > 0 || hasEvents) && (now < nextExpiration)
> > && !close) {
> > +            if (nextExpiration > 0 && (keyCount > 0 || hasEvents) &&
> > (now < nextExpiration) && !close) {
> >                  return;
> >              }
> >              //timeout
> > @@ -1421,10 +1430,11 @@ public class NioEndpoint extends Abstrac
> >                          cancelledKey(key, SocketStatus.ERROR); //we
> > don't support any keys without attachments
> >                      } else if ( ka.getError() ) {
> >                          cancelledKey(key, SocketStatus.ERROR);//TODO
> > this is not yet being used
> > -                    } else if (ka.getComet() && ka.getCometNotify() )
> {
> > +                    } else if (ka.getCometNotify() ) {
> >                          ka.setCometNotify(false);
> > -                        reg(key,ka,0);//avoid multiple calls, this
> gets
> > reregistered after invocation
> > -                        //if (!processSocket(ka.getChannel(),
> > SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(),
> > SocketStatus.DISCONNECT);
> > +                        int ops = ka.interestOps() & ~OP_CALLBACK;;
> > +                        reg(key,ka,0);//avoid multiple calls, this
> gets
> > re-registered after invocation
> > +                        ka.interestOps(ops);
> >                          if (!processSocket(ka.getChannel(),
> > SocketStatus.OPEN_READ, true)) processSocket(ka.getChannel(),
> > SocketStatus.DISCONNECT, true);
> >                      } else if
> ((ka.interestOps()&SelectionKey.OP_READ)
> > == SelectionKey.OP_READ ||
> >
> (ka.interestOps()&SelectionKey.OP_WRITE)
> > == SelectionKey.OP_WRITE) {
> >
> > Modified:
> > tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/ne
> > t/SecureNioChannel.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> > (original)
> > +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
> > Fri Jul  6 06:53:52 2012
> > @@ -478,7 +478,7 @@ public class SecureNioChannel extends Ni
> >              int written = sc.write(src);
> >              return written;
> >          } else {
> > -            //make sure we can handle expand, and that we only use on
> > buffer
> > +            //make sure we can handle expand, and that we only use
> one
> > buffer
> >              if ( (!this.isSendFile()) && (src !=
> > bufHandler.getWriteBuffer()) ) throw new IllegalArgumentException("You
> > can only write using the application write buffer provided by the
> > handler.");
> >              //are we closing or closed?
> >              if ( closing || closed) throw new IOException("Channel is
> > in closing state.");
> >
> > Modified:
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonbl
> >
> ocking/TestNonBlockingAPI.java?rev=1358055&r1=1358054&r2=1358055&view=di
> > ff
> >
> ========================================================================
> > ======
> > ---
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a (original)
> > +++
> >
> tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.jav
> > a Fri Jul  6 06:53:52 2012
> > @@ -17,10 +17,10 @@
> >  package org.apache.catalina.nonblocking;
> >
> >  import java.io.IOException;
> > +import java.util.Arrays;
> >  import java.util.HashMap;
> >  import java.util.List;
> >  import java.util.Map;
> > -import java.util.Random;
> >
> >  import javax.servlet.AsyncContext;
> >  import javax.servlet.AsyncEvent;
> > @@ -28,14 +28,12 @@ import javax.servlet.AsyncListener;
> >  import javax.servlet.ReadListener;
> >  import javax.servlet.ServletException;
> >  import javax.servlet.ServletInputStream;
> > +import javax.servlet.ServletOutputStream;
> >  import javax.servlet.WriteListener;
> >  import javax.servlet.annotation.WebServlet;
> >  import javax.servlet.http.HttpServletRequest;
> >  import javax.servlet.http.HttpServletResponse;
> >
> > -import org.junit.Assert;
> > -import org.junit.Test;
> > -
> >  import org.apache.catalina.Wrapper;
> >  import org.apache.catalina.core.StandardContext;
> >  import org.apache.catalina.startup.BytesStreamer;
> > @@ -44,9 +42,14 @@ import org.apache.catalina.startup.Tomca
> >  import org.apache.catalina.startup.TomcatBaseTest;
> >  import org.apache.coyote.http11.Http11NioProtocol;
> >  import org.apache.tomcat.util.buf.ByteChunk;
> > +import org.apache.tomcat.util.buf.ByteChunk.ByteOutputChannel;
> > +import org.junit.Assert;
> > +import org.junit.Test;
> >
> >  public class TestNonBlockingAPI extends TomcatBaseTest {
> >
> > +    public static final long bytesToDownload = 1024 * 1024 * 5;
> > +
> >      @Override
> >      protected String getProtocol() {
> >          return Http11NioProtocol.class.getName();
> > @@ -58,31 +61,89 @@ public class TestNonBlockingAPI extends
> >      }
> >
> >      @Test
> > -    public void testOne() throws Exception {
> > +    public void testNonBlockingRead() throws Exception {
> >          // Configure a context with digest auth and a single
> protected
> > resource
> >          Tomcat tomcat = getTomcatInstance();
> >          // Must have a real docBase - just use temp
> > -        StandardContext ctx = (StandardContext)
> > -            tomcat.addContext("",
> > System.getProperty("java.io.tmpdir"));
> > +        StandardContext ctx = (StandardContext) tomcat.addContext("",
> > System.getProperty("java.io.tmpdir"));
> >
> > -        NBTesterServlet servlet = new NBTesterServlet();
> > -        String servletName = NBTesterServlet.class.getName();
> > +        NBReadServlet servlet = new NBReadServlet();
> > +        String servletName = NBReadServlet.class.getName();
> >          Wrapper servletWrapper = tomcat.addServlet(ctx, servletName,
> > servlet);
> >          ctx.addServletMapping("/", servletName);
> >
> >          tomcat.start();
> >
> > -        Map<String,List<String>> resHeaders= new HashMap<String,
> > List<String>>();
> > -        int rc = postUrl(true, new DataWriter(),"http://localhost:" +
> > getPort() + "/", new ByteChunk(),resHeaders, null);
> > +        Map<String, List<String>> resHeaders = new HashMap<String,
> > List<String>>();
> > +        int rc = postUrl(true, new DataWriter(500),
> "http://localhost:"
> > + getPort() + "/", new ByteChunk(),
> > +                resHeaders, null);
> >          Assert.assertEquals(HttpServletResponse.SC_OK, rc);
> >      }
> >
> > +    @Test
> > +    public void testNonBlockingWrite() throws Exception {
> > +        String bind = "localhost";
> > +        // Configure a context with digest auth and a single
> protected
> > resource
> > +        Tomcat tomcat = getTomcatInstance();
> > +        // Must have a real docBase - just use temp
> > +        StandardContext ctx = (StandardContext) tomcat.addContext("",
> > System.getProperty("java.io.tmpdir"));
> > +
> > +        NBWriteServlet servlet = new NBWriteServlet();
> > +        String servletName = NBWriteServlet.class.getName();
> > +        Wrapper servletWrapper = tomcat.addServlet(ctx, servletName,
> > servlet);
> > +        ctx.addServletMapping("/", servletName);
> > +        tomcat.getConnector().setProperty("socket.txBufSize",
> "1024");
> > +        tomcat.getConnector().setProperty("address", bind);
> > +
> > System.out.println(tomcat.getConnector().getProperty("address"));
> > +        tomcat.start();
> > +
> > +        Map<String, List<String>> resHeaders = new HashMap<String,
> > List<String>>();
> > +        ByteChunk slowReader = new ByteChunk();
> > +        slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't
> > work
> > +        slowReader.setByteOutputChannel(new ByteOutputChannel() {
> > +            long counter = 0;
> > +            long delta = 0;
> > +
> > +            @Override
> > +            public void realWriteBytes(byte[] cbuf, int off, int len)
> > throws IOException {
> > +                try {
> > +                    if (len == 0)
> > +                        return;
> > +                    counter += len;
> > +                    delta += len;
> > +                    if (counter > bytesToDownload) {
> > +                        System.out.println("ERROR Downloaded more
> than
> > expected ERROR");
> > +                    } else if (counter == bytesToDownload) {
> > +                        System.out.println("Download complete(" +
> > bytesToDownload + " bytes)");
> > +                        // } else if (counter > (1966086)) {
> > +                        // System.out.println("Download almost
> > complete, missing bytes ("+counter+")");
> > +                    } else if (delta > (bytesToDownload / 16)) {
> > +                        System.out.println("Read " + counter + "
> > bytes.");
> > +                        delta = 0;
> > +                        Thread.currentThread().sleep(500);
> > +                    }
> > +                } catch (Exception x) {
> > +                    throw new IOException(x);
> > +                }
> > +            }
> > +        });
> > +        int rc = postUrl(true, new DataWriter(0), "http://" + bind +
> > ":" + getPort() + "/", slowReader, resHeaders,
> > +                null);
> > +        slowReader.flushBuffer();
> > +        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
> > +    }
> >
> >      public static class DataWriter implements BytesStreamer {
> >          final int max = 5;
> >          int count = 0;
> > +        long delay = 0;
> >          byte[] b = "WANTMORE".getBytes();
> >          byte[] f = "FINISHED".getBytes();
> > +
> > +        public DataWriter(long delay) {
> > +            this.delay = delay;
> > +        }
> > +
> >          @Override
> >          public int getLength() {
> >              return b.length * max;
> > @@ -90,7 +151,7 @@ public class TestNonBlockingAPI extends
> >
> >          @Override
> >          public int available() {
> > -            if (count<max) {
> > +            if (count < max) {
> >                  return b.length;
> >              } else {
> >                  return 0;
> > @@ -100,9 +161,14 @@ public class TestNonBlockingAPI extends
> >          @Override
> >          public byte[] next() {
> >              if (count < max) {
> > -                if (count>0) try {Thread.sleep(6000);}catch(Exception
> > x){}
> > +                if (count > 0)
> > +                    try {
> > +                        if (delay > 0)
> > +                            Thread.sleep(delay);
> > +                    } catch (Exception x) {
> > +                    }
> >                  count++;
> > -                if (count<max)
> > +                if (count < max)
> >                      return b;
> >                  else
> >                      return f;
> > @@ -113,12 +179,12 @@ public class TestNonBlockingAPI extends
> >
> >      }
> >
> > -    @WebServlet(asyncSupported=true)
> > -    public static class NBTesterServlet extends TesterServlet {
> > +    @WebServlet(asyncSupported = true)
> > +    public static class NBReadServlet extends TesterServlet {
> >
> >          @Override
> >          protected void service(HttpServletRequest req,
> > HttpServletResponse resp) throws ServletException, IOException {
> > -            //step 1 - start async
> > +            // step 1 - start async
> >              AsyncContext actx = req.startAsync();
> >              actx.setTimeout(Long.MAX_VALUE);
> >              actx.addListener(new AsyncListener() {
> > @@ -147,7 +213,7 @@ public class TestNonBlockingAPI extends
> >
> >                  }
> >              });
> > -            //step 2 - notify on read
> > +            // step 2 - notify on read
> >              ServletInputStream in = req.getInputStream();
> >              ReadListener rlist = new TestReadListener(actx);
> >              in.setReadListener(rlist);
> > @@ -155,17 +221,12 @@ public class TestNonBlockingAPI extends
> >              while (in.isReady()) {
> >                  rlist.onDataAvailable();
> >              }
> > -            //step 3 - notify that we wish to read
> > -            //ServletOutputStream out = resp.getOutputStream();
> > -            //out.setWriteListener(new TestWriteListener(actx));
> > +            // step 3 - notify that we wish to read
> > +            // ServletOutputStream out = resp.getOutputStream();
> > +            // out.setWriteListener(new TestWriteListener(actx));
> >
> >          }
> >
> > -        @Override
> > -        protected void doPost(HttpServletRequest req,
> > HttpServletResponse resp) throws ServletException, IOException {
> > -            doGet(req,resp);
> > -        }
> > -
> >          private class TestReadListener implements ReadListener {
> >              AsyncContext ctx;
> >
> > @@ -177,9 +238,9 @@ public class TestNonBlockingAPI extends
> >              public void onDataAvailable() {
> >                  try {
> >                      ServletInputStream in =
> > ctx.getRequest().getInputStream();
> > -                    int avail=0;
> > +                    int avail = 0;
> >                      String s = "";
> > -                    while ((avail=in.dataAvailable()) > 0) {
> > +                    while ((avail = in.dataAvailable()) > 0) {
> >                          byte[] b = new byte[avail];
> >                          in.read(b);
> >                          s += new String(b);
> > @@ -191,7 +252,7 @@ public class TestNonBlockingAPI extends
> >                      } else {
> >                          in.isReady();
> >                      }
> > -                }catch (Exception x) {
> > +                } catch (Exception x) {
> >                      x.printStackTrace();
> >                      ctx.complete();
> >                  }
> > @@ -212,9 +273,62 @@ public class TestNonBlockingAPI extends
> >              }
> >          }
> >
> > -        private class TestWriteListener implements WriteListener {
> > +    }
> > +
> > +    @WebServlet(asyncSupported = true)
> > +    public static class NBWriteServlet extends TesterServlet {
> >
> > +        @Override
> > +        protected void service(HttpServletRequest req,
> > HttpServletResponse resp) throws ServletException, IOException {
> > +            // step 1 - start async
> > +            AsyncContext actx = req.startAsync();
> > +            actx.setTimeout(Long.MAX_VALUE);
> > +            actx.addListener(new AsyncListener() {
> > +
> > +                @Override
> > +                public void onTimeout(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onTimeout");
> > +
> > +                }
> > +
> > +                @Override
> > +                public void onStartAsync(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onStartAsync");
> > +
> > +                }
> > +
> > +                @Override
> > +                public void onError(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onError");
> > +
> > +                }
> > +
> > +                @Override
> > +                public void onComplete(AsyncEvent event) throws
> > IOException {
> > +                    System.out.println("onComplete");
> > +
> > +                }
> > +            });
> > +            // step 2 - notify on read
> > +            // ServletInputStream in = req.getInputStream();
> > +            // ReadListener rlist = new TestReadListener(actx);
> > +            // in.setReadListener(rlist);
> > +            //
> > +            // while (in.isReady()) {
> > +            // rlist.onDataAvailable();
> > +            // }
> > +            // step 3 - notify that we wish to read
> > +            ServletOutputStream out = resp.getOutputStream();
> > +            resp.setBufferSize(200 * 1024);
> > +            TestWriteListener listener = new TestWriteListener(actx);
> > +            out.setWriteListener(listener);
> > +            listener.onWritePossible();
> > +        }
> > +
> > +        private class TestWriteListener implements WriteListener {
> > +            long chunk = 1024 * 1024;
> >              AsyncContext ctx;
> > +            long bytesToDownload =
> TestNonBlockingAPI.bytesToDownload;
> >
> >              public TestWriteListener(AsyncContext ctx) {
> >                  this.ctx = ctx;
> > @@ -222,36 +336,40 @@ public class TestNonBlockingAPI extends
> >
> >              @Override
> >              public void onWritePossible() {
> > -                // TODO Auto-generated method stub
> > +                System.out.println("onWritePossible");
> > +                try {
> > +                    long left = Math.max(bytesToDownload, 0);
> > +                    long start = System.currentTimeMillis();
> > +                    long end = System.currentTimeMillis();
> > +                    long before = left;
> > +                    while (left > 0 &&
> > ctx.getResponse().getOutputStream().canWrite()) {
> > +                        byte[] b = new byte[(int) Math.min(chunk,
> > bytesToDownload)];
> > +                        Arrays.fill(b, (byte) 'X');
> > +                        ctx.getResponse().getOutputStream().write(b);
> > +                        bytesToDownload -= b.length;
> > +                        left = Math.max(bytesToDownload, 0);
> > +                    }
> > +                    System.out
> > +                            .println("Write took:" + (end - start) +
> "
> > ms. Bytes before=" + before + " after=" + left);
> > +                    // only call complete if we have emptied the
> buffer
> > +                    if (left == 0 &&
> > ctx.getResponse().getOutputStream().canWrite()) {
> > +                        // it is illegal to call complete
> > +                        // if there is a write in progress
> > +                        ctx.complete();
> > +                    }
> > +                } catch (Exception x) {
> > +                    x.printStackTrace();
> > +                }
> >
> >              }
> >
> >              @Override
> >              public void onError(Throwable throwable) {
> > -                // TODO Auto-generated method stub
> > -
> > +                System.out.println("onError");
> > +                throwable.printStackTrace();
> >              }
> >
> >          }
> >
> > -
> > -
> > -    }
> > -
> > -
> > -    private static class StockQuotes {
> > -        public StockQuotes() {
> > -            super();
> > -        }
> > -        Random r = new Random(System.currentTimeMillis());
> > -
> > -        public String getNextQuote() {
> > -            return String.format("VMW: $%10.0f", r.nextDouble());
> > -        }
> > -
> > -
> > -
> >      }
> > -
> > -
> >  }
> >
> > Modified:
> > tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> > URL:
> >
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/start
> > up/TomcatBaseTest.java?rev=1358055&r1=1358054&r2=1358055&view=diff
> >
> ========================================================================
> > ======
> > --- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> > (original)
> > +++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
> > Fri Jul  6 06:53:52 2012
> > @@ -49,6 +49,7 @@ import org.apache.catalina.core.AprLifec
> >  import org.apache.catalina.core.StandardServer;
> >  import org.apache.catalina.session.StandardManager;
> >  import org.apache.catalina.valves.AccessLogValve;
> > +import org.apache.coyote.http11.Http11NioProtocol;
> >  import org.apache.tomcat.util.buf.ByteChunk;
> >
> >  /**
> > @@ -141,9 +142,9 @@ public abstract class TomcatBaseTest ext
> >          // Has a protocol been specified
> >          String protocol = System.getProperty("tomcat.test.protocol");
> >
> > -        // Use BIO by default
> > +        // Use NIO by default in Tomcat 8
> >          if (protocol == null) {
> > -            protocol = "org.apache.coyote.http11.Http11Protocol";
> > +            protocol = Http11NioProtocol.class.getName();
> >          }
> >
> >          return protocol;
> >
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> > For additional commands, e-mail: dev-help@tomcat.apache.org
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org