You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2012/07/06 08:53:53 UTC

svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net/...

Author: fhanik
Date: Fri Jul  6 06:53:52 2012
New Revision: 1358055

URL: http://svn.apache.org/viewvc?rev=1358055&view=rev
Log:
implement rev 1 of async/non blocking writes


Modified:
    tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
    tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
    tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
    tomcat/trunk/java/org/apache/catalina/connector/Request.java
    tomcat/trunk/java/org/apache/catalina/connector/Response.java
    tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
    tomcat/trunk/java/org/apache/coyote/Response.java
    tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
    tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java

Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Fri Jul  6 06:53:52 2012
@@ -296,18 +296,15 @@ public class CoyoteAdapter implements Ad
             }
 
 
-            if (!request.isAsyncDispatching() && request.isAsync()) {
-                AtomicBoolean result = new AtomicBoolean(true);
-                req.action(ActionCode.ASYNC_DISPATCH_FOR_OPERATION, this);
-                if (result.get()) {
-                    if (status==SocketStatus.OPEN_WRITE) {
-                        //TODO Notify write listener
-                    } else if (status==SocketStatus.OPEN_READ) {
-                        //TODO Notify read listener
-                        asyncConImpl.canRead();
-                    }
-                    success = true;
+            if (!request.isAsyncDispatching() && request.isAsync() && request.isAsyncOperation()) {
+                if (status == SocketStatus.OPEN_WRITE) {
+                    // TODO Notify write listener
+                    success = asyncConImpl.canWrite();
+                } else if (status == SocketStatus.OPEN_READ) {
+                    // TODO Notify read listener
+                    success = asyncConImpl.canRead();
                 }
+
             }
 
             if (request.isAsyncDispatching()) {

Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java Fri Jul  6 06:53:52 2012
@@ -109,23 +109,15 @@ public class CoyoteOutputStream
         ob.close();
     }
 
-    /**
-     * TODO SERVLET 3.1
-     */
     @Override
     public boolean canWrite() {
-        // TODO Auto-generated method stub
-        return false;
+        return ob.canWrite();
     }
 
 
-    /**
-     * TODO SERVLET 3.1
-     */
     @Override
     public void setWriteListener(WriteListener listener) {
-        // TODO Auto-generated method stub
-
+        ob.setWriteListener(listener);
     }
 
 

Modified: tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java Fri Jul  6 06:53:52 2012
@@ -23,6 +23,9 @@ import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.servlet.WriteListener;
 
 import org.apache.catalina.Globals;
 import org.apache.coyote.ActionCode;
@@ -607,4 +610,25 @@ public class OutputBuffer extends Writer
     }
 
 
+    public boolean canWrite() {
+        if (getWriteListener()==null) throw new IllegalStateException("not in non blocking mode.");
+        AtomicBoolean canWrite = new AtomicBoolean(true);
+        coyoteResponse.action(ActionCode.NB_WRITE_INTEREST, canWrite);
+        return canWrite.get();
+}
+
+
+
+    private volatile WriteListener listener;
+    public void setWriteListener(WriteListener listener) {
+        this.listener = listener;
+        coyoteResponse.action(ActionCode.SET_WRITE_LISTENER, listener);
+    }
+
+    public WriteListener getWriteListener() {
+        return listener;
+    }
+
+
+
 }

Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Fri Jul  6 06:53:52 2012
@@ -1653,6 +1653,16 @@ public class Request
         return result.get();
     }
 
+    public boolean isAsyncOperation() {
+        if (asyncContext == null) {
+            return false;
+        }
+
+        AtomicBoolean result = new AtomicBoolean(false);
+        coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC_OPERATION, result);
+        return result.get();
+    }
+
     @Override
     public boolean isAsyncSupported() {
         if (this.asyncSupported == null) {

Modified: tomcat/trunk/java/org/apache/catalina/connector/Response.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Response.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Fri Jul  6 06:53:52 2012
@@ -153,6 +153,10 @@ public class Response
         outputBuffer.setResponse(coyoteResponse);
     }
 
+    public org.apache.coyote.Response getCoyoteResponse() {
+        return this.coyoteResponse;
+    }
+
 
     /**
      * Return the Context within which this Request is being processed.

Modified: tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java (original)
+++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java Fri Jul  6 06:53:52 2012
@@ -106,6 +106,8 @@ public class AsyncContextImpl implements
     }
 
     public boolean canRead() throws IOException {
+        if (request.getCoyoteRequest().getReadListener()==null) return false;
+
         ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
         ClassLoader newCL = request.getContext().getLoader().getClassLoader();
         try {
@@ -121,7 +123,16 @@ public class AsyncContextImpl implements
     }
 
     public boolean canWrite() throws IOException {
-        return false;
+        if (request.getResponse().getCoyoteResponse().getWriteListener()==null) return false;
+        ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
+        ClassLoader newCL = request.getContext().getLoader().getClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(newCL);
+            request.getResponse().getCoyoteResponse().getWriteListener().onWritePossible();
+        }finally {
+            Thread.currentThread().setContextClassLoader(oldCL);
+    }
+        return true;
     }
 
     public boolean timeout() throws IOException {

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Jul  6 06:53:52 2012
@@ -183,6 +183,11 @@ public enum ActionCode {
     ASYNC_IS_ASYNC,
 
     /**
+     * Callback to determine if async read/write is in progress
+     */
+    ASYNC_IS_ASYNC_OPERATION,
+
+    /**
      * Callback to determine if async dispatch is in progress
      */
     ASYNC_IS_STARTED,

Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul  6 06:53:52 2012
@@ -149,6 +149,10 @@ public class AsyncStateMachine<S> {
         return state.isAsync();
     }
 
+    public boolean isAsyncOperation() {
+        return state == AsyncState.READ_WRITE_OP;
+    }
+
     public boolean isAsyncDispatching() {
         return state.isDispatching();
     }

Modified: tomcat/trunk/java/org/apache/coyote/Response.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Response.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Response.java Fri Jul  6 06:53:52 2012
@@ -21,6 +21,11 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.util.Locale;
 
+import javax.servlet.ReadListener;
+import javax.servlet.WriteListener;
+
+import org.apache.coyote.http11.AbstractInputBuffer;
+import org.apache.coyote.http11.AbstractOutputBuffer;
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.http.parser.AstMediaType;
@@ -541,4 +546,29 @@ public final class Response {
         }
         return outputBuffer.getBytesWritten();
     }
+
+    protected volatile WriteListener listener;
+
+    public WriteListener getWriteListener() {
+        return listener;
+}
+
+    public void setWriteListener(WriteListener listener) {
+        //TODO SERVLET 3.1 is it allowed to switch from non block to blocking?
+        setBlocking(listener==null);
+        this.listener = listener;
+    }
+
+    protected volatile boolean blocking = true;
+
+    public boolean isBlocking() {
+        return blocking;
+    }
+
+    public void setBlocking(boolean blocking) throws IllegalStateException {
+        @SuppressWarnings("rawtypes")
+        AbstractOutputBuffer buf = (AbstractOutputBuffer)outputBuffer;
+        if (!blocking && !buf.supportsNonBlocking()) throw new IllegalStateException();
+        this.blocking = blocking;
+    }
 }

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Fri Jul  6 06:53:52 2012
@@ -455,6 +455,8 @@ public abstract class AbstractAjpProcess
             ((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
         } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
             ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsyncOperation());
         } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
             ((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
         } else if (actionCode == ActionCode.UPGRADE) {

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Fri Jul  6 06:53:52 2012
@@ -252,8 +252,7 @@ public class AjpNioProcessor extends Abs
 
         if (actionCode == ActionCode.ASYNC_COMPLETE) {
             if (asyncStateMachine.asyncComplete()) {
-                ((NioEndpoint)endpoint).processSocket(this.socket,
-                        SocketStatus.OPEN_READ, false);
+                ((NioEndpoint)endpoint).dispatchForEvent(socket, SocketStatus.OPEN_READ, false);
             }
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
             if (param == null) return;
@@ -264,11 +263,9 @@ public class AjpNioProcessor extends Abs
             }
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
             if (asyncStateMachine.asyncDispatch()) {
-                ((NioEndpoint)endpoint).processSocket(this.socket,
-                        SocketStatus.OPEN_READ, true);
+                ((NioEndpoint)endpoint).dispatchForEvent(socket, SocketStatus.OPEN_READ, true);            }
             }
         }
-    }
 
 
     // ------------------------------------------------------ Protected Methods

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Fri Jul  6 06:53:52 2012
@@ -832,6 +832,8 @@ public abstract class AbstractHttp11Proc
             ((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
         } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
             ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsyncOperation());
         } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
             ((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
         } else if (actionCode == ActionCode.UPGRADE) {

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Fri Jul  6 06:53:52 2012
@@ -546,4 +546,9 @@ public abstract class AbstractOutputBuff
         }
     }
 
+    // --------------------------------------------------------- Public Methods
+
+
+    public abstract boolean supportsNonBlocking();
+
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Fri Jul  6 06:53:52 2012
@@ -20,9 +20,11 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetAddress;
 import java.nio.channels.SelectionKey;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.ssl.SSLEngine;
 import javax.servlet.ReadListener;
+import javax.servlet.WriteListener;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.RequestInfo;
@@ -102,19 +104,7 @@ public class Http11NioProcessor extends 
      */
     protected SocketWrapper<NioChannel> socket = null;
 
-    /**
-     * TODO SERVLET 3.1
-     */
-    protected ReadListener readListener;
-
-    public ReadListener getReadListener() {
-        return readListener;
-    }
-
-    public void setReadListener(ReadListener listener) {
-        readListener = listener;
-    }
-
+    protected volatile boolean wantOnWritePossible = false;
 
     // --------------------------------------------------------- Public Methods
 
@@ -185,6 +175,84 @@ public class Http11NioProcessor extends 
     }
 
 
+
+
+    @Override
+    public SocketState asyncDispatch(SocketStatus status) {
+        final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
+
+
+        if (status == SocketStatus.OPEN_WRITE) {
+            try {
+                asyncStateMachine.asyncOperation();
+                try {
+                    if (outputBuffer.hasDataToWrite()) {
+                        //System.out.println("Attempting data flush!!");
+                        outputBuffer.flushBuffer(false);
+                    }
+                }catch (IOException x) {
+                    if (log.isDebugEnabled()) log.debug("Unable to write async data.",x);
+                    //TODO FIXME-- fix - so we can notify of error
+                    return SocketState.CLOSED;
+                }
+                //return if we have more data to write
+                if (isRegisteredForWrite(attach)) {
+                    return SocketState.LONG;
+                }
+            }catch (IllegalStateException x) {
+            }
+        } else if (status == SocketStatus.OPEN_READ) {
+            try {
+                try {
+                    if (inputBuffer.nbRead()>0) {
+                        asyncStateMachine.asyncOperation();
+                    }
+                }catch (IOException x) {
+                    if (log.isDebugEnabled()) log.debug("Unable to read async data.",x);
+                  //TODO FIXME-- fix - so we can notify of error
+                    return SocketState.CLOSED;
+                }
+                //return if we have more data to write
+            }catch (IllegalStateException x) {
+            }
+        }
+
+        SocketState state = super.asyncDispatch(status);
+        //return if we have more data to write
+        if (isRegisteredForWrite(attach)) {
+            return SocketState.LONG;
+        } else {
+            return state;
+        }
+    }
+
+
+
+    @Override
+    public SocketState process(SocketWrapper<NioChannel> socketWrapper) throws IOException {
+        SocketState state = super.process(socketWrapper);
+        final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
+        //return if we have more data to write
+        if (isRegisteredForWrite(attach)) {
+            return SocketState.LONG;
+        } else {
+            return state;
+        }
+    }
+
+
+
+
+    protected boolean isRegisteredForWrite(KeyAttachment attach) {
+        //return if we have more data to write
+        if (outputBuffer.hasDataToWrite()) {
+            attach.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     @Override
     protected void resetTimeouts() {
         final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
@@ -305,11 +373,14 @@ public class Http11NioProcessor extends 
     }
 
 
+
+
     @Override
     public void recycleInternal() {
         socket = null;
         comet = false;
         sendfileData = null;
+        wantOnWritePossible = false;
     }
 
 
@@ -492,8 +563,7 @@ public class Http11NioProcessor extends 
             }
         } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
             if (asyncStateMachine.asyncComplete()) {
-                ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
-                        SocketStatus.OPEN_READ, true);
+                ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketStatus.OPEN_READ, true);
             }
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
             if (param==null) {
@@ -508,14 +578,15 @@ public class Http11NioProcessor extends 
             attach.setTimeout(timeout);
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
             if (asyncStateMachine.asyncDispatch()) {
-                ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
-                        SocketStatus.OPEN_READ, true);
+                ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketStatus.OPEN_READ, true);
             }
-        } else if (actionCode == ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
-            asyncStateMachine.asyncOperation();
         } else if (actionCode == ActionCode.SET_READ_LISTENER) {
             ReadListener listener = (ReadListener)param;
             request.setReadListener(listener);
+        } else if (actionCode == ActionCode.SET_WRITE_LISTENER) {
+            WriteListener listener = (WriteListener)param;
+            response.setWriteListener(listener);
+            outputBuffer.setBlocking(listener==null);
         } else if (actionCode == ActionCode.NB_READ_INTEREST) {
             if (socket==null || socket.getSocket().getAttachment(false)==null) {
                 return;
@@ -524,8 +595,27 @@ public class Http11NioProcessor extends 
             if (rp.getStage() == org.apache.coyote.Constants.STAGE_SERVICE) {
                 NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
                 attach.interestOps(attach.interestOps() | SelectionKey.OP_READ);
+            } else {
+                throw new IllegalStateException("Calling isReady asynchronously is illegal.");
             }
-
+        } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
+            if (socket==null || socket.getSocket().getAttachment(false)==null) {
+                return;
+        }
+            AtomicBoolean canWrite = (AtomicBoolean)param;
+            RequestInfo rp = request.getRequestProcessor();
+            if (rp.getStage() == org.apache.coyote.Constants.STAGE_SERVICE) {
+                if (outputBuffer.isWritable()) {
+                    canWrite.set(true);
+                } else {
+                    canWrite.set(false);
+                    wantOnWritePossible = true;
+    }
+            } else {
+                throw new IllegalStateException("Calling canWrite asynchronously is illegal.");
+            }
+        } else if (actionCode == ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
+            asyncStateMachine.asyncOperation();
         }
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Fri Jul  6 06:53:52 2012
@@ -86,6 +86,11 @@ public class InternalAprOutputBuffer ext
     // --------------------------------------------------------- Public Methods
 
     @Override
+    public boolean supportsNonBlocking() {
+        return false;
+    }
+
+    @Override
     public void init(SocketWrapper<Long> socketWrapper,
             AbstractEndpoint endpoint) throws IOException {
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Fri Jul  6 06:53:52 2012
@@ -21,6 +21,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Response;
@@ -76,9 +82,80 @@ public class InternalNioOutputBuffer ext
      */
     private NioSelectorPool pool;
 
+    /**
+     * Flag used only for Comet requests/responses
+     */
+    protected volatile boolean blocking = true;
+
+    /**
+     * Track if the byte buffer is flipped
+     */
+    protected volatile boolean flipped = false;
+
+    /**
+     * For "non-blocking" writes use an external buffer
+     */
+    protected volatile LinkedBlockingDeque<ByteBufferHolder> bufferedWrite = null;
+
+    /**
+     * The max size of the buffered write buffer
+     */
+    protected int bufferedWriteSize = 64*1024; //64k default write buffer
+
+    /**
+     * Number of bytes last written
+     */
+    protected AtomicInteger lastWrite = new AtomicInteger(1);
+
+    protected class ByteBufferHolder {
+        private ByteBuffer buf;
+        private AtomicBoolean flipped;
+        public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
+           this.buf = buf;
+           this.flipped = new AtomicBoolean(flipped);
+        }
+        public ByteBuffer getBuf() {
+            return buf;
+        }
+        public boolean isFlipped() {
+            return flipped.get();
+        }
+
+        public boolean flip() {
+            if (flipped.compareAndSet(false, true)) {
+                buf.flip();
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        public boolean hasData() {
+            if (flipped.get()) {
+                return buf.remaining()>0;
+            } else {
+                return buf.position()>0;
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder(super.toString());
+            builder.append("[flipped=");
+            builder.append(isFlipped()?"true, remaining=" : "false, position=");
+            builder.append(isFlipped()? buf.remaining(): buf.position());
+            builder.append("]");
+            return builder.toString();
+        }
+
+    }
 
     // --------------------------------------------------------- Public Methods
 
+    @Override
+    public boolean supportsNonBlocking() {
+        return true;
+    }
 
     /**
      * Flush the response.
@@ -91,7 +168,7 @@ public class InternalNioOutputBuffer ext
 
         super.flush();
         // Flush the current buffer
-        flushBuffer();
+        flushBuffer(isBlocking());
 
     }
 
@@ -107,6 +184,9 @@ public class InternalNioOutputBuffer ext
             socket.getBufHandler().getWriteBuffer().clear();
             socket = null;
         }
+        lastWrite.set(1);
+        setBlocking(true);
+        flipped = false;
     }
 
 
@@ -118,7 +198,7 @@ public class InternalNioOutputBuffer ext
     @Override
     public void endRequest() throws IOException {
         super.endRequest();
-        flushBuffer();
+        flushBuffer(true);
     }
 
     // ------------------------------------------------ HTTP/1.1 Output Methods
@@ -146,8 +226,12 @@ public class InternalNioOutputBuffer ext
      * @throws IOException
      * TODO Fix non blocking write properly
      */
+    int total = 0;
     private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
-        if ( flip ) bytebuffer.flip();
+        if ( flip ) {
+            bytebuffer.flip();
+            flipped = true;
+        }
 
         int written = 0;
         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
@@ -168,7 +252,14 @@ public class InternalNioOutputBuffer ext
         }finally {
             if ( selector != null ) pool.put(selector);
         }
-        if ( block ) bytebuffer.clear(); //only clear
+        if ( block || bytebuffer.remaining()==0) {
+            //blocking writes must empty the buffer
+            //and if remaining==0 then we did empty it
+            bytebuffer.clear();
+            flipped = false;
+        }
+        total+= written;
+        //System.out.println("Successful write("+written+ " / "+total);
         return written;
     }
 
@@ -204,31 +295,48 @@ public class InternalNioOutputBuffer ext
 
     }
 
+
     private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
-        while (length > 0) {
-            int thisTime = length;
-            if (socket.getBufHandler().getWriteBuffer().position() ==
-                    socket.getBufHandler().getWriteBuffer().capacity()
-                    || socket.getBufHandler().getWriteBuffer().remaining()==0) {
-                flushBuffer();
-            }
-            if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) {
-                thisTime = socket.getBufHandler().getWriteBuffer().remaining();
-            }
-            socket.getBufHandler().getWriteBuffer().put(buf, offset, thisTime);
+        //try to write to socket first
+        if (length==0) return;
+
+        boolean dataLeft = flushBuffer(isBlocking());
+
+        while (!dataLeft && length>0) {
+            int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
             length = length - thisTime;
             offset = offset + thisTime;
+            writeToSocket(socket.getBufHandler().getWriteBuffer(), isBlocking(), true);
+            dataLeft = flushBuffer(isBlocking());
         }
+
         NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
         if ( ka!= null ) ka.access();//prevent timeouts for just doing client writes
+
+        if (!isBlocking() && length>0) {
+            //we must buffer as long as it fits
+            //ByteBufferHolder tail = bufferedWrite.
+            addToBuffers(buf, offset, length);
+    }
+    }
+
+    private void addToBuffers(byte[] buf, int offset, int length) {
+        ByteBufferHolder holder = bufferedWrite.peekLast();
+        if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+            holder = new ByteBufferHolder(buffer,false);
+            bufferedWrite.add(holder);
+        }
+        holder.getBuf().put(buf,offset,length);
     }
 
 
     /**
      * Callback to write data from the buffer.
      */
-    private void flushBuffer() throws IOException {
+    protected boolean flushBuffer(boolean block) throws IOException {
 
+        int result = 0;
         //prevent timeout for async,
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if (key != null) {
@@ -236,16 +344,61 @@ public class InternalNioOutputBuffer ext
             attach.access();
         }
 
+        boolean dataLeft = hasMoreDataToFlush();
+
         //write to the socket, if there is anything to write
-        if (socket.getBufHandler().getWriteBuffer().position() > 0) {
-            socket.getBufHandler().getWriteBuffer().flip();
-            writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false);
+        if ( dataLeft ) {
+            result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
         }
+
+        dataLeft = hasMoreDataToFlush();
+
+        if (!dataLeft && bufferedWrite!=null) {
+            Iterator<ByteBufferHolder> bufIter = bufferedWrite.iterator();
+            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+                ByteBufferHolder buffer = bufIter.next();
+                buffer.flip();
+                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+                    transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
+                    if (buffer.getBuf().remaining() == 0) {
+                        bufIter.remove();
     }
+                    result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
+                    //here we must break if we didn't finish the write
 
+                }
+            }
+        }
+
+        dataLeft = hasMoreDataToFlush();
+
+        return dataLeft;
+    }
+
+    private boolean hasMoreDataToFlush() {
+        return (flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) ||
+        (!flipped && socket.getBufHandler().getWriteBuffer().position() > 0);
+    }
+
+    private int transfer(byte[] from, int offset, int length, ByteBuffer to) {
+        int max = Math.min(length, to.remaining());
+        ByteBuffer tmp = ByteBuffer.wrap(from, offset, max);
+        tmp.limit (tmp.position() + max);
+        to.put (tmp);
+        return max;
+    }
+
+    private int transfer(ByteBuffer from, ByteBuffer to) {
+        int max = Math.min(from.remaining(), to.remaining());
+        ByteBuffer tmp = from.duplicate ();
+        tmp.limit (tmp.position() + max);
+        to.put (tmp);
+        from.position(from.position() + max);
+        return max;
+    }
 
-    // ----------------------------------- OutputStreamOutputBuffer Inner Class
 
+    // ----------------------------------- OutputStreamOutputBuffer Inner Class
 
     /**
      * This class is an output buffer which will write data to an output
@@ -275,4 +428,44 @@ public class InternalNioOutputBuffer ext
             return byteCount;
         }
     }
+
+    //----------------------------------------non blocking writes -----------------
+    public void setBlocking(boolean blocking) {
+        this.blocking = blocking;
+        if (blocking)
+            bufferedWrite = null;
+        else
+            bufferedWrite = new LinkedBlockingDeque<ByteBufferHolder>();
+}
+
+    public void setBufferedWriteSize(int bufferedWriteSize) {
+        this.bufferedWriteSize = bufferedWriteSize;
+    }
+
+    public boolean isBlocking() {
+        return blocking;
+    }
+
+    private boolean hasBufferedData() {
+        boolean result = false;
+        if (bufferedWrite!=null) {
+            Iterator<ByteBufferHolder> iter = bufferedWrite.iterator();
+            while (!result && iter.hasNext()) {
+                result = iter.next().hasData();
+            }
+        }
+        return result;
+    }
+
+    public boolean hasDataToWrite() {
+        return hasMoreDataToFlush() || hasBufferedData();
+    }
+
+    public int getBufferedWriteSize() {
+        return bufferedWriteSize;
+    }
+
+    public boolean isWritable() {
+        return !hasDataToWrite();
+    }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java Fri Jul  6 06:53:52 2012
@@ -96,6 +96,11 @@ public class InternalOutputBuffer extend
     // --------------------------------------------------------- Public Methods
 
     @Override
+    public boolean supportsNonBlocking() {
+        return false;
+    }
+
+    @Override
     public void init(SocketWrapper<Socket> socketWrapper,
             AbstractEndpoint endpoint) throws IOException {
 

Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Fri Jul  6 06:53:52 2012
@@ -386,6 +386,8 @@ public class SpdyProcessor extends Abstr
             ((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
         } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
             ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsyncOperation());
         } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
             ((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
         } else {

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Jul  6 06:53:52 2012
@@ -716,7 +716,16 @@ public class NioEndpoint extends Abstrac
         return true;
     }
 
-    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
+    public boolean dispatchForEvent(NioChannel socket, SocketStatus status, boolean dispatch) {
+        if (!dispatch) {
+            processSocket(socket,status,dispatch);
+        } else {
+            socket.getPoller().add(socket, OP_CALLBACK);
+        }
+        return true;
+    }
+
+    protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
         try {
             KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
             if (attachment == null) {
@@ -900,7 +909,7 @@ public class NioEndpoint extends Abstrac
                         final KeyAttachment att = (KeyAttachment) key.attachment();
                         if ( att!=null ) {
                             //handle callback flag
-                            if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
+                            if ((interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                 att.setCometNotify(true);
                             } else {
                                 att.setCometNotify(false);
@@ -910,7 +919,8 @@ public class NioEndpoint extends Abstrac
                             //we are registering the key to start with, reset the fairness counter.
                             int ops = key.interestOps() | interestOps;
                             att.interestOps(ops);
-                            key.interestOps(ops);
+                            if (att.getCometNotify()) key.interestOps(0);
+                            else key.interestOps(ops);
                         } else {
                             cancel = true;
                         }
@@ -979,10 +989,6 @@ public class NioEndpoint extends Abstrac
         public void cometInterest(NioChannel socket) {
             KeyAttachment att = (KeyAttachment)socket.getAttachment(false);
             add(socket,att.getCometOps());
-            if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
-                nextExpiration = 0; //force the check for faster callback
-                selector.wakeup();
-            }
         }
 
         /**
@@ -1001,6 +1007,9 @@ public class NioEndpoint extends Abstrac
             PollerEvent r = eventCache.poll();
             if ( r==null) r = new PollerEvent(socket,null,interestOps);
             else r.reset(socket,null,interestOps);
+            if ( (interestOps&OP_CALLBACK) == OP_CALLBACK ) {
+                nextExpiration = 0; //force the check for faster callback
+            }
             addEvent(r);
             if (close) {
                 processSocket(socket, SocketStatus.STOP, false);
@@ -1256,7 +1265,7 @@ public class NioEndpoint extends Abstrac
 
                                 boolean readAndWrite = sk.isReadable() && sk.isWritable();
                                 reg(sk, attachment, 0);
-                                if (readAndWrite) {
+                                if (attachment.isAsync() && readAndWrite) {
                                     //remember the that we want to know about write too
                                     attachment.interestOps(SelectionKey.OP_WRITE);
                                 }
@@ -1406,7 +1415,7 @@ public class NioEndpoint extends Abstrac
             // - the selector simply timed out (suggests there isn't much load)
             // - the nextExpiration time has passed
             // - the server socket is being closed
-            if ((keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
+            if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
                 return;
             }
             //timeout
@@ -1421,10 +1430,11 @@ public class NioEndpoint extends Abstrac
                         cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
                     } else if ( ka.getError() ) {
                         cancelledKey(key, SocketStatus.ERROR);//TODO this is not yet being used
-                    } else if (ka.getComet() && ka.getCometNotify() ) {
+                    } else if (ka.getCometNotify() ) {
                         ka.setCometNotify(false);
-                        reg(key,ka,0);//avoid multiple calls, this gets reregistered after invocation
-                        //if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
+                        int ops = ka.interestOps() & ~OP_CALLBACK;;
+                        reg(key,ka,0);//avoid multiple calls, this gets re-registered after invocation
+                        ka.interestOps(ops);
                         if (!processSocket(ka.getChannel(), SocketStatus.OPEN_READ, true)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT, true);
                     } else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
                               (ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Fri Jul  6 06:53:52 2012
@@ -478,7 +478,7 @@ public class SecureNioChannel extends Ni
             int written = sc.write(src);
             return written;
         } else {
-            //make sure we can handle expand, and that we only use on buffer
+            //make sure we can handle expand, and that we only use one buffer
             if ( (!this.isSendFile()) && (src != bufHandler.getWriteBuffer()) ) throw new IllegalArgumentException("You can only write using the application write buffer provided by the handler.");
             //are we closing or closed?
             if ( closing || closed) throw new IOException("Channel is in closing state.");

Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Fri Jul  6 06:53:52 2012
@@ -17,10 +17,10 @@
 package org.apache.catalina.nonblocking;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import javax.servlet.AsyncContext;
 import javax.servlet.AsyncEvent;
@@ -28,14 +28,12 @@ import javax.servlet.AsyncListener;
 import javax.servlet.ReadListener;
 import javax.servlet.ServletException;
 import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
 import javax.servlet.WriteListener;
 import javax.servlet.annotation.WebServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import org.apache.catalina.Wrapper;
 import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.startup.BytesStreamer;
@@ -44,9 +42,14 @@ import org.apache.catalina.startup.Tomca
 import org.apache.catalina.startup.TomcatBaseTest;
 import org.apache.coyote.http11.Http11NioProtocol;
 import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.ByteChunk.ByteOutputChannel;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class TestNonBlockingAPI extends TomcatBaseTest {
 
+    public static final long bytesToDownload = 1024 * 1024 * 5;
+
     @Override
     protected String getProtocol() {
         return Http11NioProtocol.class.getName();
@@ -58,31 +61,89 @@ public class TestNonBlockingAPI extends 
     }
 
     @Test
-    public void testOne() throws Exception {
+    public void testNonBlockingRead() throws Exception {
         // Configure a context with digest auth and a single protected resource
         Tomcat tomcat = getTomcatInstance();
         // Must have a real docBase - just use temp
-        StandardContext ctx = (StandardContext)
-            tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
 
-        NBTesterServlet servlet = new NBTesterServlet();
-        String servletName = NBTesterServlet.class.getName();
+        NBReadServlet servlet = new NBReadServlet();
+        String servletName = NBReadServlet.class.getName();
         Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
         ctx.addServletMapping("/", servletName);
 
         tomcat.start();
 
-        Map<String,List<String>> resHeaders= new HashMap<String, List<String>>();
-        int rc = postUrl(true, new DataWriter(),"http://localhost:" + getPort() + "/", new ByteChunk(),resHeaders, null);
+        Map<String, List<String>> resHeaders = new HashMap<String, List<String>>();
+        int rc = postUrl(true, new DataWriter(500), "http://localhost:" + getPort() + "/", new ByteChunk(),
+                resHeaders, null);
         Assert.assertEquals(HttpServletResponse.SC_OK, rc);
     }
 
+    @Test
+    public void testNonBlockingWrite() throws Exception {
+        String bind = "localhost";
+        // Configure a context with digest auth and a single protected resource
+        Tomcat tomcat = getTomcatInstance();
+        // Must have a real docBase - just use temp
+        StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+
+        NBWriteServlet servlet = new NBWriteServlet();
+        String servletName = NBWriteServlet.class.getName();
+        Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
+        ctx.addServletMapping("/", servletName);
+        tomcat.getConnector().setProperty("socket.txBufSize", "1024");
+        tomcat.getConnector().setProperty("address", bind);
+        System.out.println(tomcat.getConnector().getProperty("address"));
+        tomcat.start();
+
+        Map<String, List<String>> resHeaders = new HashMap<String, List<String>>();
+        ByteChunk slowReader = new ByteChunk();
+        slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't work
+        slowReader.setByteOutputChannel(new ByteOutputChannel() {
+            long counter = 0;
+            long delta = 0;
+
+            @Override
+            public void realWriteBytes(byte[] cbuf, int off, int len) throws IOException {
+                try {
+                    if (len == 0)
+                        return;
+                    counter += len;
+                    delta += len;
+                    if (counter > bytesToDownload) {
+                        System.out.println("ERROR Downloaded more than expected ERROR");
+                    } else if (counter == bytesToDownload) {
+                        System.out.println("Download complete(" + bytesToDownload + " bytes)");
+                        // } else if (counter > (1966086)) {
+                        // System.out.println("Download almost complete, missing bytes ("+counter+")");
+                    } else if (delta > (bytesToDownload / 16)) {
+                        System.out.println("Read " + counter + " bytes.");
+                        delta = 0;
+                        Thread.currentThread().sleep(500);
+                    }
+                } catch (Exception x) {
+                    throw new IOException(x);
+                }
+            }
+        });
+        int rc = postUrl(true, new DataWriter(0), "http://" + bind + ":" + getPort() + "/", slowReader, resHeaders,
+                null);
+        slowReader.flushBuffer();
+        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+    }
 
     public static class DataWriter implements BytesStreamer {
         final int max = 5;
         int count = 0;
+        long delay = 0;
         byte[] b = "WANTMORE".getBytes();
         byte[] f = "FINISHED".getBytes();
+
+        public DataWriter(long delay) {
+            this.delay = delay;
+        }
+
         @Override
         public int getLength() {
             return b.length * max;
@@ -90,7 +151,7 @@ public class TestNonBlockingAPI extends 
 
         @Override
         public int available() {
-            if (count<max) {
+            if (count < max) {
                 return b.length;
             } else {
                 return 0;
@@ -100,9 +161,14 @@ public class TestNonBlockingAPI extends 
         @Override
         public byte[] next() {
             if (count < max) {
-                if (count>0) try {Thread.sleep(6000);}catch(Exception x){}
+                if (count > 0)
+                    try {
+                        if (delay > 0)
+                            Thread.sleep(delay);
+                    } catch (Exception x) {
+                    }
                 count++;
-                if (count<max)
+                if (count < max)
                     return b;
                 else
                     return f;
@@ -113,12 +179,12 @@ public class TestNonBlockingAPI extends 
 
     }
 
-    @WebServlet(asyncSupported=true)
-    public static class NBTesterServlet extends TesterServlet {
+    @WebServlet(asyncSupported = true)
+    public static class NBReadServlet extends TesterServlet {
 
         @Override
         protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-            //step 1 - start async
+            // step 1 - start async
             AsyncContext actx = req.startAsync();
             actx.setTimeout(Long.MAX_VALUE);
             actx.addListener(new AsyncListener() {
@@ -147,7 +213,7 @@ public class TestNonBlockingAPI extends 
 
                 }
             });
-            //step 2 - notify on read
+            // step 2 - notify on read
             ServletInputStream in = req.getInputStream();
             ReadListener rlist = new TestReadListener(actx);
             in.setReadListener(rlist);
@@ -155,17 +221,12 @@ public class TestNonBlockingAPI extends 
             while (in.isReady()) {
                 rlist.onDataAvailable();
             }
-            //step 3 - notify that we wish to read
-            //ServletOutputStream out = resp.getOutputStream();
-            //out.setWriteListener(new TestWriteListener(actx));
+            // step 3 - notify that we wish to read
+            // ServletOutputStream out = resp.getOutputStream();
+            // out.setWriteListener(new TestWriteListener(actx));
 
         }
 
-        @Override
-        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-            doGet(req,resp);
-        }
-
         private class TestReadListener implements ReadListener {
             AsyncContext ctx;
 
@@ -177,9 +238,9 @@ public class TestNonBlockingAPI extends 
             public void onDataAvailable() {
                 try {
                     ServletInputStream in = ctx.getRequest().getInputStream();
-                    int avail=0;
+                    int avail = 0;
                     String s = "";
-                    while ((avail=in.dataAvailable()) > 0) {
+                    while ((avail = in.dataAvailable()) > 0) {
                         byte[] b = new byte[avail];
                         in.read(b);
                         s += new String(b);
@@ -191,7 +252,7 @@ public class TestNonBlockingAPI extends 
                     } else {
                         in.isReady();
                     }
-                }catch (Exception x) {
+                } catch (Exception x) {
                     x.printStackTrace();
                     ctx.complete();
                 }
@@ -212,9 +273,62 @@ public class TestNonBlockingAPI extends 
             }
         }
 
-        private class TestWriteListener implements WriteListener {
+    }
+
+    @WebServlet(asyncSupported = true)
+    public static class NBWriteServlet extends TesterServlet {
 
+        @Override
+        protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+            // step 1 - start async
+            AsyncContext actx = req.startAsync();
+            actx.setTimeout(Long.MAX_VALUE);
+            actx.addListener(new AsyncListener() {
+
+                @Override
+                public void onTimeout(AsyncEvent event) throws IOException {
+                    System.out.println("onTimeout");
+
+                }
+
+                @Override
+                public void onStartAsync(AsyncEvent event) throws IOException {
+                    System.out.println("onStartAsync");
+
+                }
+
+                @Override
+                public void onError(AsyncEvent event) throws IOException {
+                    System.out.println("onError");
+
+                }
+
+                @Override
+                public void onComplete(AsyncEvent event) throws IOException {
+                    System.out.println("onComplete");
+
+                }
+            });
+            // step 2 - notify on read
+            // ServletInputStream in = req.getInputStream();
+            // ReadListener rlist = new TestReadListener(actx);
+            // in.setReadListener(rlist);
+            //
+            // while (in.isReady()) {
+            // rlist.onDataAvailable();
+            // }
+            // step 3 - notify that we wish to read
+            ServletOutputStream out = resp.getOutputStream();
+            resp.setBufferSize(200 * 1024);
+            TestWriteListener listener = new TestWriteListener(actx);
+            out.setWriteListener(listener);
+            listener.onWritePossible();
+        }
+
+        private class TestWriteListener implements WriteListener {
+            long chunk = 1024 * 1024;
             AsyncContext ctx;
+            long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
 
             public TestWriteListener(AsyncContext ctx) {
                 this.ctx = ctx;
@@ -222,36 +336,40 @@ public class TestNonBlockingAPI extends 
 
             @Override
             public void onWritePossible() {
-                // TODO Auto-generated method stub
+                System.out.println("onWritePossible");
+                try {
+                    long left = Math.max(bytesToDownload, 0);
+                    long start = System.currentTimeMillis();
+                    long end = System.currentTimeMillis();
+                    long before = left;
+                    while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) {
+                        byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)];
+                        Arrays.fill(b, (byte) 'X');
+                        ctx.getResponse().getOutputStream().write(b);
+                        bytesToDownload -= b.length;
+                        left = Math.max(bytesToDownload, 0);
+                    }
+                    System.out
+                            .println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left);
+                    // only call complete if we have emptied the buffer
+                    if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) {
+                        // it is illegal to call complete
+                        // if there is a write in progress
+                        ctx.complete();
+                    }
+                } catch (Exception x) {
+                    x.printStackTrace();
+                }
 
             }
 
             @Override
             public void onError(Throwable throwable) {
-                // TODO Auto-generated method stub
-
+                System.out.println("onError");
+                throwable.printStackTrace();
             }
 
         }
 
-
-
-    }
-
-
-    private static class StockQuotes {
-        public StockQuotes() {
-            super();
-        }
-        Random r = new Random(System.currentTimeMillis());
-
-        public String getNextQuote() {
-            return String.format("VMW: $%10.0f", r.nextDouble());
-        }
-
-
-
     }
-
-
 }

Modified: tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java (original)
+++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java Fri Jul  6 06:53:52 2012
@@ -49,6 +49,7 @@ import org.apache.catalina.core.AprLifec
 import org.apache.catalina.core.StandardServer;
 import org.apache.catalina.session.StandardManager;
 import org.apache.catalina.valves.AccessLogValve;
+import org.apache.coyote.http11.Http11NioProtocol;
 import org.apache.tomcat.util.buf.ByteChunk;
 
 /**
@@ -141,9 +142,9 @@ public abstract class TomcatBaseTest ext
         // Has a protocol been specified
         String protocol = System.getProperty("tomcat.test.protocol");
 
-        // Use BIO by default
+        // Use NIO by default in Tomcat 8
         if (protocol == null) {
-            protocol = "org.apache.coyote.http11.Http11Protocol";
+            protocol = Http11NioProtocol.class.getName();
         }
 
         return protocol;



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org