You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2012/07/04 03:56:49 UTC

svn commit: r1357039 - in /tomcat/trunk: java/javax/servlet/ java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/ test/org/apache/catalina/nonblocking/

Author: fhanik
Date: Wed Jul  4 01:56:46 2012
New Revision: 1357039

URL: http://svn.apache.org/viewvc?rev=1357039&view=rev
Log:
First revision of an example non blocking read operation.
The servlet specification, and discussions on the expert group are quite contradictory.
According to the specification request response object can only live during service() or 
when an AsyncContext is present. but the NIO api is written with examples of bypassing both.

So for this iteration, we are working with the assumption that 
NIO is only allowed during async operations, otherwise there is no point to NIO.
This is because 'polling' was not allowed, so there is no point to have NIO when the thread is present


Added:
    tomcat/trunk/test/org/apache/catalina/nonblocking/
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java   (with props)
Modified:
    tomcat/trunk/java/javax/servlet/ServletInputStream.java
    tomcat/trunk/java/javax/servlet/ServletOutputStream.java
    tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
    tomcat/trunk/java/org/apache/catalina/connector/CoyoteInputStream.java
    tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
    tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
    tomcat/trunk/java/org/apache/coyote/Request.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractInputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketStatus.java

Modified: tomcat/trunk/java/javax/servlet/ServletInputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/servlet/ServletInputStream.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/javax/servlet/ServletInputStream.java (original)
+++ tomcat/trunk/java/javax/servlet/ServletInputStream.java Wed Jul  4 01:56:46 2012
@@ -94,8 +94,11 @@ public abstract class ServletInputStream
      * @return
      */
     public abstract boolean isFinished();
+
     /**
      * TODO SERVLET 3.1
+     * If this returns false, the container will invoke
+     * {@link ReadListener#onDataAvailable()} when data is available.
      * @return
      */
     public abstract boolean isReady();

Modified: tomcat/trunk/java/javax/servlet/ServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/servlet/ServletOutputStream.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/javax/servlet/ServletOutputStream.java (original)
+++ tomcat/trunk/java/javax/servlet/ServletOutputStream.java Wed Jul  4 01:56:46 2012
@@ -273,6 +273,8 @@ public abstract class ServletOutputStrea
     }
 
     /**
+     * If this returns false, it will cause a callback to
+     * {@link WriteListener#onWritePossible()} when the buffer has emptied
      * TODO SERVLET 3.1
      * @return
      */

Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Wed Jul  4 01:56:46 2012
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.Charset;
 import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.servlet.RequestDispatcher;
 import javax.servlet.SessionTrackingMode;
@@ -293,6 +294,22 @@ public class CoyoteAdapter implements Ad
                     asyncConImpl.setErrorState(null);
                 }
             }
+
+
+            if (!request.isAsyncDispatching() && request.isAsync()) {
+                AtomicBoolean result = new AtomicBoolean(true);
+                req.action(ActionCode.ASYNC_DISPATCH_FOR_OPERATION, this);
+                if (result.get()) {
+                    if (status==SocketStatus.OPEN_WRITE) {
+                        //TODO Notify write listener
+                    } else if (status==SocketStatus.OPEN) {
+                        //TODO Notify read listener
+                        asyncConImpl.canRead();
+                    }
+                    success = true;
+                }
+            }
+
             if (request.isAsyncDispatching()) {
                 success = true;
                 connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteInputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteInputStream.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteInputStream.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteInputStream.java Wed Jul  4 01:56:46 2012
@@ -242,25 +242,20 @@ public class CoyoteInputStream
         }
     }
 
-    /**
-     * TODO SERVLET 3.1
-     */
     @Override
     public boolean isFinished() {
-        // TODO Auto-generated method stub
-        return false;
+        return ib.isFinished();
     }
 
     @Override
     public int dataAvailable() {
-        // TODO Auto-generated method stub
-        return 0;
+        return ib.dataAvailable();
     }
 
 
     @Override
     public boolean isReady() {
-        return dataAvailable()>0;
+        return ib.isReady();
     }
 
 
@@ -269,10 +264,8 @@ public class CoyoteInputStream
      */
     @Override
     public void setReadListener(ReadListener listener) {
-        // TODO Auto-generated method stub
-
+        ib.setReadListener(listener);
     }
 
 
-
 }

Modified: tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java Wed Jul  4 01:56:46 2012
@@ -23,6 +23,8 @@ import java.security.PrivilegedActionExc
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 
+import javax.servlet.ReadListener;
+
 import org.apache.catalina.security.SecurityUtil;
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.Request;
@@ -247,6 +249,51 @@ public class InputBuffer extends Reader
         return available;
     }
 
+    private volatile ReadListener listener;
+    public void setReadListener(ReadListener listener) {
+        this.listener = listener;
+        coyoteRequest.action(ActionCode.SET_READ_LISTENER, listener);
+    }
+
+    public ReadListener getReadListener() {
+        return listener;
+    }
+
+    public boolean isFinished() {
+        return dataAvailable()==0;
+    }
+
+    public int dataAvailable() {
+        if (getReadListener()==null) throw new IllegalStateException("not in non blocking mode.");
+        int result = 0;
+        //first check if we have buffered something already
+        result = available();
+
+        if(result <= 0) {
+            //here we can issue a non blocking read
+            //if supported
+            //TODO SERVLET 3.1
+        }
+
+        return result;
+
+    }
+
+
+    public boolean isReady() {
+        if (getReadListener()==null) throw new IllegalStateException("not in non blocking mode.");
+        int available = dataAvailable();
+        boolean result = available>0;
+        if (!result) {
+            coyoteRequest.action(ActionCode.NB_READ_INTEREST, null);
+        }
+        return result;
+    }
+
+
+
+
+
 
     // ------------------------------------------------- Bytes Handling Methods
 
@@ -441,7 +488,9 @@ public class InputBuffer extends Reader
         if (closed) {
             throw new IOException(sm.getString("inputBuffer.streamClosed"));
         }
-
+        if (state == INITIAL_STATE) {
+            state = CHAR_STATE;
+        }
         return (available() > 0);
     }
 

Modified: tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java (original)
+++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java Wed Jul  4 01:56:46 2012
@@ -105,6 +105,25 @@ public class AsyncContextImpl implements
         }
     }
 
+    public boolean canRead() throws IOException {
+        ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
+        ClassLoader newCL = request.getContext().getLoader().getClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(newCL);
+            request.getCoyoteRequest().getReadListener().onDataAvailable();
+            if (request.getInputStream().dataAvailable()==0) {
+                request.getCoyoteRequest().getReadListener().onAllDataRead();
+            }
+        }finally {
+            Thread.currentThread().setContextClassLoader(oldCL);
+        }
+        return true;
+    }
+
+    public boolean canWrite() throws IOException {
+        return false;
+    }
+
     public boolean timeout() throws IOException {
         AtomicBoolean result = new AtomicBoolean();
         request.getCoyoteRequest().action(ActionCode.ASYNC_TIMEOUT, result);

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Wed Jul  4 01:56:46 2012
@@ -137,6 +137,13 @@ public enum ActionCode {
     ASYNC_DISPATCH,
 
     /**
+     * Callback when an async call is
+     * {@link javax.servlet.ReadListener#onDataAvailable()} or
+     * {@link javax.servlet.WriteListener#onWritePossible()}
+     */
+    ASYNC_DISPATCH_FOR_OPERATION,
+
+    /**
      * Callback to indicate the the actual dispatch has started and that the
      * async state needs change.
      */
@@ -193,5 +200,27 @@ public enum ActionCode {
     /**
      * Callback to trigger the HTTP upgrade process.
      */
-    UPGRADE
+    UPGRADE,
+
+    /**
+     * Callback to trigger setting the ReadListener
+     */
+    SET_READ_LISTENER,
+
+    /**
+     * Callback to trigger setting the WriteListener
+     */
+    SET_WRITE_LISTENER,
+
+    /**
+     * Indicator that Servlet is interested in being
+     * notified when data is available to be read
+     */
+    NB_READ_INTEREST,
+
+    /**
+     *Indicator that the Servlet is interested
+     *in being notified when it can write data
+     */
+    NB_WRITE_INTEREST
 }

Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Wed Jul  4 01:56:46 2012
@@ -69,18 +69,18 @@ import org.apache.tomcat.util.res.String
  * |       |          |               |                       |               |
  * |       |          |              \|/                     \|/   complete() |
  * |       |          |         MUST_DISPATCH              STARTED---->-------|
- * |       |          |           |                         |   |
- * |       |          |           |postProcess()            |   |
- * ^       ^          |           |              dispatch() |   |auto
- * |       |          |           |    |--------------------|   |
- * |       |          | auto     \|/  \|/                      \|/
- * |       |          |---<----DISPATCHING<-----------------TIMING_OUT
- * |       |                                  dispatch()      |   |
- * |       |                                                  |   |
- * |       |-------<-------------------------------------<----|   |
- * |                              complete()                      |
- * |                                                              |
- * |----<------------------------<-----------------------------<--|
+ * |       |          |           |                         |   |  \ \
+ * |       |          |           |postProcess()            |   |   \ \
+ * ^       ^          |           |              dispatch() |   |auto\ \
+ * |       |          |           |    |--------------------|   |     \ \----<------
+ * |       |          | auto     \|/  \|/                      \|/     \            \
+ * |       |          |---<----DISPATCHING<-----------------TIMING_OUT  \            \
+ * |       |                                  dispatch()      |   |      |            \
+ * |       |                                                  |   |     \|/            |
+ * |       |-------<-------------------------------------<----|   |   READ_WRITE_OP ->-
+ * |                              complete()              \-------|------|
+ * |                                                              |      |
+ * |----<------------------------<-----------------------------<--|------|
  *                                 error()
  * </pre>
  */
@@ -101,6 +101,7 @@ public class AsyncStateMachine<S> {
         TIMING_OUT(true, false, false),
         MUST_DISPATCH(true, false, true),
         DISPATCHING(true, false, true),
+        READ_WRITE_OP(true,true,false),
         ERROR(true,false,false);
 
         private boolean isAsync;
@@ -167,6 +168,16 @@ public class AsyncStateMachine<S> {
         }
     }
 
+    public synchronized void asyncOperation() {
+        if (state==AsyncState.STARTED) {
+            state = AsyncState.READ_WRITE_OP;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("asyncStateMachine.invalidAsyncState",
+                            "asyncOperation()", state));
+        }
+    }
+
     /*
      * Async has been processed. Whether or not to enter a long poll depends on
      * current state. For example, as per SRV.2.3.3.3 can now process calls to
@@ -174,7 +185,7 @@ public class AsyncStateMachine<S> {
      */
     public synchronized SocketState asyncPostProcess() {
 
-        if (state == AsyncState.STARTING) {
+        if (state == AsyncState.STARTING || state == AsyncState.READ_WRITE_OP) {
             state = AsyncState.STARTED;
             return SocketState.LONG;
         } else if (state == AsyncState.MUST_COMPLETE) {
@@ -217,6 +228,8 @@ public class AsyncStateMachine<S> {
         } else if (state == AsyncState.TIMING_OUT ||
                 state == AsyncState.ERROR) {
             state = AsyncState.MUST_COMPLETE;
+        } else if (state == AsyncState.READ_WRITE_OP) {
+            state = AsyncState.MUST_COMPLETE;
         } else {
             throw new IllegalStateException(
                     sm.getString("asyncStateMachine.invalidAsyncState",

Modified: tomcat/trunk/java/org/apache/coyote/Request.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Request.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Request.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Request.java Wed Jul  4 01:56:46 2012
@@ -20,6 +20,9 @@ package org.apache.coyote;
 import java.io.IOException;
 import java.util.HashMap;
 
+import javax.servlet.ReadListener;
+
+import org.apache.coyote.http11.AbstractInputBuffer;
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.buf.MessageBytes;
 import org.apache.tomcat.util.buf.UDecoder;
@@ -136,6 +139,34 @@ public final class Request {
     private int available = 0;
 
     private final RequestInfo reqProcessorMX=new RequestInfo(this);
+
+
+    // ------------------------------------------------------------- TODO SERVLET 3.1 IN PROGRESS
+
+    protected volatile ReadListener listener;
+
+    public ReadListener getReadListener() {
+        return listener;
+    }
+
+    public void setReadListener(ReadListener listener) {
+        //TODO SERVLET 3.1 is it allowed to switch from non block to blocking?
+        setBlocking(listener==null);
+        this.listener = listener;
+    }
+
+    protected volatile boolean blocking = true;
+
+    public boolean isBlocking() {
+        return blocking;
+    }
+
+    public void setBlocking(boolean blocking) throws IllegalStateException {
+        @SuppressWarnings("rawtypes")
+        AbstractInputBuffer buf = (AbstractInputBuffer)inputBuffer;
+        if (!blocking && !buf.supportsNonBlocking()) throw new IllegalStateException();
+        this.blocking = blocking;
+    }
     // ------------------------------------------------------------- Properties
 
 
@@ -505,6 +536,9 @@ public final class Request {
         remoteUser.recycle();
         authType.recycle();
         attributes.clear();
+
+        blocking = true;
+        listener = null;
     }
 
     // -------------------- Info  --------------------

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractInputBuffer.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractInputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractInputBuffer.java Wed Jul  4 01:56:46 2012
@@ -162,6 +162,9 @@ public abstract class AbstractInputBuffe
     protected int lastActiveFilter;
 
 
+    // ------------------------------------------------------------- TODO SERVLET 3.1 IN PROGRESS
+    public abstract boolean supportsNonBlocking();
+
     // ------------------------------------------------------------- Properties
 
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Wed Jul  4 01:56:46 2012
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.nio.channels.SelectionKey;
 
 import javax.net.ssl.SSLEngine;
+import javax.servlet.ReadListener;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.RequestInfo;
@@ -101,6 +102,19 @@ public class Http11NioProcessor extends 
      */
     protected SocketWrapper<NioChannel> socket = null;
 
+    /**
+     * TODO SERVLET 3.1
+     */
+    protected ReadListener readListener;
+
+    public ReadListener getReadListener() {
+        return readListener;
+    }
+
+    public void setReadListener(ReadListener listener) {
+        readListener = listener;
+    }
+
 
     // --------------------------------------------------------- Public Methods
 
@@ -497,6 +511,21 @@ public class Http11NioProcessor extends 
                 ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
                         SocketStatus.OPEN, true);
             }
+        } else if (actionCode == ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
+            asyncStateMachine.asyncOperation();
+        } else if (actionCode == ActionCode.SET_READ_LISTENER) {
+            ReadListener listener = (ReadListener)param;
+            request.setReadListener(listener);
+        } else if (actionCode == ActionCode.NB_READ_INTEREST) {
+            if (socket==null || socket.getSocket().getAttachment(false)==null) {
+                return;
+            }
+            RequestInfo rp = request.getRequestProcessor();
+            if (rp.getStage() == org.apache.coyote.Constants.STAGE_SERVICE) {
+                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
+                attach.interestOps(attach.interestOps() | SelectionKey.OP_READ);
+            }
+
         }
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java Wed Jul  4 01:56:46 2012
@@ -91,6 +91,8 @@ public class InternalAprInputBuffer exte
 
     // --------------------------------------------------------- Public Methods
 
+
+
     /**
      * Recycle the input buffer. This should be called when closing the
      * connection.
@@ -102,6 +104,13 @@ public class InternalAprInputBuffer exte
     }
 
 
+    @Override
+    public boolean supportsNonBlocking() {
+        //TODO SERVLET 3.1
+        return false;
+    }
+
+
     /**
      * Read the request line. This function is meant to be used during the
      * HTTP request header parsing. Do NOT attempt to read the request body

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java Wed Jul  4 01:56:46 2012
@@ -442,6 +442,12 @@ public class InternalInputBuffer extends
 
     }
 
+    @Override
+    public boolean supportsNonBlocking() {
+        //TODO SERVLET 3.1
+        return false;
+    }
+
 
     @Override
     public void recycle() {

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Wed Jul  4 01:56:46 2012
@@ -169,6 +169,49 @@ public class InternalNioInputBuffer exte
 
     // --------------------------------------------------------- Public Methods
 
+
+    @Override
+    public boolean supportsNonBlocking() {
+        return true;
+    }
+
+
+    @Override
+    public int available() {
+
+        int available = super.available();
+        if (available>0) {
+            return available;
+        }
+
+        available = Math.max(lastValid - pos, 0);
+        if (available>0) {
+            return available;
+        }
+        try {
+            available = nbRead();
+        }catch (IOException x) {
+            //TODO SERVLET 3.1 -
+            //we should not swallow this exception
+
+            if (log.isDebugEnabled()) {
+                log.debug("Unable to issue non blocking read.", x);
+            }
+        }
+        return available;
+    }
+
+    /**
+     * Issues a non blocking read
+     * @return int - nr of bytes read
+     * @throws IOException
+     */
+    public int nbRead() throws IOException {
+        return readSocket(true,false);
+    }
+
+
+
     /**
      * Recycle the input buffer. This should be called when closing the
      * connection.

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Jul  4 01:56:46 2012
@@ -1244,7 +1244,7 @@ public class NioEndpoint extends Abstrac
                                         processSocket(channel, SocketStatus.DISCONNECT, true);
                                 } else {
                                     //future placement of a WRITE notif
-                                    if (!processSocket(channel, SocketStatus.OPEN, true))
+                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true))
                                         processSocket(channel, SocketStatus.DISCONNECT, true);
                                 }
                             } else {
@@ -1253,8 +1253,23 @@ public class NioEndpoint extends Abstrac
                         } else {
                             //later on, improve latch behavior
                             if ( isWorkerAvailable() ) {
-                                unreg(sk, attachment,sk.readyOps());
-                                boolean close = (!processSocket(channel, null, true));
+
+                                boolean readAndWrite = sk.isReadable() && sk.isWritable();
+                                reg(sk, attachment, 0);
+                                if (readAndWrite) {
+                                    //remember the that we want to know about write too
+                                    attachment.interestOps(SelectionKey.OP_WRITE);
+                                }
+                                //read goes before write
+                                if (sk.isReadable()) {
+                                    //read notification
+                                    if (!processSocket(channel, SocketStatus.OPEN, true))
+                                        close = true;
+                                } else {
+                                    //future placement of a WRITE notif
+                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true))
+                                        close = true;
+                                }
                                 if (close) {
                                     cancelledKey(sk,SocketStatus.DISCONNECT);
                                 }
@@ -1652,11 +1667,10 @@ public class NioEndpoint extends Abstrac
                                     (KeyAttachment) key.attachment(),
                                     status);
                         }
-
+                        KeyAttachment ka = (KeyAttachment)key.attachment();
                         if (state == SocketState.CLOSED) {
                             // Close socket and pool
                             try {
-                                KeyAttachment ka = (KeyAttachment) key.attachment();
                                 if (ka!=null) ka.setComet(false);
                                 socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
                                 nioChannels.offer(socket);
@@ -1666,6 +1680,9 @@ public class NioEndpoint extends Abstrac
                             }catch ( Exception x ) {
                                 log.error("",x);
                             }
+                        } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
+                            //we are async, and we are interested in operations
+                            ka.getPoller().add(socket, ka.interestOps());
                         }
                     } else if (handshake == -1 ) {
                         KeyAttachment ka = null;

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketStatus.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketStatus.java?rev=1357039&r1=1357038&r2=1357039&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketStatus.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketStatus.java Wed Jul  4 01:56:46 2012
@@ -23,5 +23,5 @@ package org.apache.tomcat.util.net;
  * @author remm
  */
 public enum SocketStatus {
-    OPEN, STOP, TIMEOUT, DISCONNECT, ERROR
+    OPEN, OPEN_WRITE, STOP, TIMEOUT, DISCONNECT, ERROR
 }

Added: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1357039&view=auto
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (added)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Wed Jul  4 01:56:46 2012
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.nonblocking;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.ReadListener;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.Wrapper;
+import org.apache.catalina.core.StandardContext;
+import org.apache.catalina.startup.BytesStreamer;
+import org.apache.catalina.startup.TesterServlet;
+import org.apache.catalina.startup.Tomcat;
+import org.apache.catalina.startup.TomcatBaseTest;
+import org.apache.coyote.http11.Http11NioProtocol;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestNonBlockingAPI extends TomcatBaseTest {
+
+    @Override
+    protected String getProtocol() {
+        return Http11NioProtocol.class.getName();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Test
+    public void testOne() throws Exception {
+        // Configure a context with digest auth and a single protected resource
+        Tomcat tomcat = getTomcatInstance();
+        // Must have a real docBase - just use temp
+        StandardContext ctx = (StandardContext)
+            tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+
+        NBTesterServlet servlet = new NBTesterServlet();
+        String servletName = NBTesterServlet.class.getName();
+        Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
+        ctx.addServletMapping("/", servletName);
+
+        tomcat.start();
+
+        Map<String,List<String>> resHeaders= new HashMap<String, List<String>>();
+        int rc = postUrl(true, new DataWriter(),"http://localhost:" + getPort() + "/", new ByteChunk(),resHeaders, null);
+        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+    }
+
+
+    public static class DataWriter implements BytesStreamer {
+        final int max = 5;
+        int count = 0;
+        byte[] b = "WANTMORE".getBytes();
+        byte[] f = "FINISHED".getBytes();
+        @Override
+        public int getLength() {
+            return b.length * max;
+        }
+
+        @Override
+        public int available() {
+            if (count<max) {
+                return b.length;
+            } else {
+                return 0;
+            }
+        }
+
+        @Override
+        public byte[] next() {
+            if (count < max) {
+                if (count>0) try {Thread.sleep(6000);}catch(Exception x){}
+                count++;
+                if (count<max)
+                    return b;
+                else
+                    return f;
+            } else {
+                return null;
+            }
+        }
+
+    }
+
+    @WebServlet(asyncSupported=true)
+    public static class NBTesterServlet extends TesterServlet {
+
+        @Override
+        protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+            //step 1 - start async
+            AsyncContext actx = req.startAsync();
+            actx.setTimeout(Long.MAX_VALUE);
+            actx.addListener(new AsyncListener() {
+
+                @Override
+                public void onTimeout(AsyncEvent event) throws IOException {
+                    System.out.println("onTimeout");
+
+                }
+
+                @Override
+                public void onStartAsync(AsyncEvent event) throws IOException {
+                    System.out.println("onStartAsync");
+
+                }
+
+                @Override
+                public void onError(AsyncEvent event) throws IOException {
+                    System.out.println("onError");
+
+                }
+
+                @Override
+                public void onComplete(AsyncEvent event) throws IOException {
+                    System.out.println("onComplete");
+
+                }
+            });
+            //step 2 - notify on read
+            ServletInputStream in = req.getInputStream();
+            ReadListener rlist = new TestReadListener(actx);
+            in.setReadListener(rlist);
+
+            while (in.isReady()) {
+                rlist.onDataAvailable();
+            }
+            //step 3 - notify that we wish to read
+            //ServletOutputStream out = resp.getOutputStream();
+            //out.setWriteListener(new TestWriteListener(actx));
+
+        }
+
+        @Override
+        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+            doGet(req,resp);
+        }
+
+        private class TestReadListener implements ReadListener {
+            AsyncContext ctx;
+
+            public TestReadListener(AsyncContext ctx) {
+                this.ctx = ctx;
+            }
+
+            @Override
+            public void onDataAvailable() {
+                try {
+                    ServletInputStream in = ctx.getRequest().getInputStream();
+                    int avail=0;
+                    String s = "";
+                    while ((avail=in.dataAvailable()) > 0) {
+                        byte[] b = new byte[avail];
+                        in.read(b);
+                        s += new String(b);
+                    }
+                    System.out.println(s);
+                    if ("FINISHED".equals(s)) {
+                        ctx.complete();
+                        ctx.getResponse().getWriter().print("OK");
+                    } else {
+                        in.isReady();
+                    }
+                }catch (Exception x) {
+                    x.printStackTrace();
+                    ctx.complete();
+                }
+
+            }
+
+            @Override
+            public void onAllDataRead() {
+                System.out.println("onAllDataRead");
+
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                System.out.println("onError");
+                throwable.printStackTrace();
+
+            }
+        }
+
+        private class TestWriteListener implements WriteListener {
+
+            AsyncContext ctx;
+
+            public TestWriteListener(AsyncContext ctx) {
+                this.ctx = ctx;
+            }
+
+            @Override
+            public void onWritePossible() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // TODO Auto-generated method stub
+
+            }
+
+        }
+
+
+
+    }
+
+
+    private static class StockQuotes {
+        public StockQuotes() {
+            super();
+        }
+        Random r = new Random(System.currentTimeMillis());
+
+        public String getNextQuote() {
+            return String.format("VMW: $%10.0f", r.nextDouble());
+        }
+
+
+
+    }
+
+
+}

Propchange: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org