You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/12/04 01:06:31 UTC

svn commit: r600737 - in /tomcat/sandbox/gdev6x: java/org/apache/catalina/ java/org/apache/catalina/connector/ java/org/apache/catalina/core/ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/ webapps/docs/ webapps/docs/config/

Author: fhanik
Date: Mon Dec  3 16:06:24 2007
New Revision: 600737

URL: http://svn.apache.org/viewvc?rev=600737&view=rev
Log:
implemented buffered non blocking write for comet events

Modified:
    tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java
    tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java
    tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java
    tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java
    tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java
    tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/sandbox/gdev6x/webapps/docs/aio.xml
    tomcat/sandbox/gdev6x/webapps/docs/changelog.xml
    tomcat/sandbox/gdev6x/webapps/docs/config/http.xml

Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java Mon Dec  3 16:06:24 2007
@@ -131,20 +131,22 @@
      * client a notice that the server has no more data to send as part of this
      * request. The servlet should perform any needed cleanup as if it had recieved
      * an END or ERROR event. 
-     * Invoking this method during a event, will cause the session to close
+     * Invoking this method during a event, will cause the Comet session to close
      * immediately after the event method has finished.
-     * Invoking this method asynchrously will not cause the session to close
-     * until another event occurred, most likely a timeout. 
-     * If you wish to signal to the container 
-     * that the session should end sooner rather than later when this method is invoked 
-     * asycnhronously, then issue a 
-     * register(OP_CALLBACK) immediately after this method has been invoked.
+     * Invoking this method asynchrously will cause the Comet session to end after the 
+     * END event has been processed
      * 
      * @see #register(int)
      */
     public void close() throws IOException;
     
     /**
+     * Returns true if #close() has been invoked
+     * @return boolean
+     */
+    public boolean isClosed();
+    
+    /**
      * Sets the timeout for this Comet connection. Please NOTE, that the implementation 
      * of a per connection timeout is OPTIONAL and MAY NOT be implemented.<br/>
      * This method sets the timeout in milliseconds of idle time on the connection.
@@ -171,11 +173,14 @@
      * a) Blocking IO - standard servlet usage<br/>
      * b) Register for READ events when data arrives<br/>
      * Tomcat Comet allows you to configure for additional options:<br/>
-     * the <code>configureBlocking(false)</code> bit signals whether writing and reading from the request 
-     * or writing to the response will be non blocking.<br/>
-     * the <code>configureBlocking(true)</code> bit signals the container you wish for read and write to be done in a blocking fashion
-     * @param blocking - true to make read and writes blocking
-     * @throws IllegalStateException - if this method is invoked outside of the BEGIN event
+     * the <code>configureBlocking(false)</code> bit signals whether writing to the response will be non blocking.<br/>
+     * the <code>configureBlocking(true)</code> bit signals the container you wish for read and write to be done in a blocking fashion<br/>
+     * when parameter is set to false, writes will be buffered and dispatched to the servlet container 
+     * to complete the write asynchronously. The size of the write buffer can be configured.
+     * If ServletRequest.(getInputStream/getWriter) is invoked with more data than 
+     * it can handle, an IO Exception will be thrown.
+     * @param blocking - true to make writes blocking, false to make writes non blocking
+     * @throws IllegalStateException - if this method is invoked outside of the BEGIN event or if blocking has already been configured
      * @see #isReadable()
      * @see #isWriteable()
      */
@@ -188,10 +193,10 @@
     public boolean isBlocking();
     
     /**
-     * OP_CALLBACK - receive a CALLBACK event from the container
+     * OP_CALLBACK - receive a CALLBACK event from the container, on a Tomcat worker thread
      * OP_READ - receive a READ event when the connection has data to be read
      * OP_WRITE - receive a WRITE event when the connection is able to receive data to be written
-     * @see #register(int)
+     * @see #interestOps(int)
      */
     public static class CometOperation {
         //currently map these to the same values as org.apache.tomcat.util.net.PollerInterest
@@ -204,6 +209,7 @@
      * Registers the Comet connection with the container for IO and event notifications.
      * Each time this method is invoked, the operations are reset to the operations parameter value.
      * To unregister an operation, simple do interestOps(interestOps() & (~CometOperation.OP_WRITE))
+     * This method can be invoked synchronously or asynchronously (by a non Tomcat worker thread) to change the operations
      * @param operations
      * @throws IllegalStateException - if you are trying to register with a socket that already is registered
      * or if the operation you are trying to register is invalid.
@@ -243,5 +249,5 @@
      * @return boolean
      */
     public boolean isReadable();
-
+    
 }

Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java Mon Dec  3 16:06:24 2007
@@ -83,7 +83,13 @@
      * Blocking or not blocking
      */
     protected boolean blocking = true;
-
+    
+    /**
+     * Closed?
+     */
+    protected boolean closed = false;
+    
+    
     // --------------------------------------------------------- Public Methods
 
     /**
@@ -94,6 +100,7 @@
         response = null;
         blocking = true;
         cometOperations = 0;
+        closed = false;
     }
 
     public void setEventType(EventType eventType) {
@@ -104,12 +111,20 @@
         this.eventSubType = eventSubType;
     }
     
+    public boolean isClosed() {
+        return closed;
+    }
+    
     public void close() throws IOException {
+        if (!closed) closed = true;
         if (request == null) {
             throw new IllegalStateException(sm.getString("cometEvent.nullRequest"));
         }
         request.setComet(false);
         response.finishResponse();
+        //if this is a worker thread, the comet operation will be reset
+        //otherwise, we are signaling to end the request
+        interestOps(CometEvent.CometOperation.OP_CALLBACK);
     }
 
     public EventSubType getEventSubType() {

Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java Mon Dec  3 16:06:24 2007
@@ -213,8 +213,8 @@
                     try {
                         SystemLogHandler.startCapture();
                         if (comet) {
-                            filterChain.doFilterEvent(request.getEvent());
                             request.setComet(true);
+                            filterChain.doFilterEvent(request.getEvent());
                         } else {
                             filterChain.doFilter(request.getRequest(), 
                                     response.getResponse());
@@ -227,8 +227,8 @@
                     }
                 } else {
                     if (comet) {
-                        filterChain.doFilterEvent(request.getEvent());
                         request.setComet(true);
+                        filterChain.doFilterEvent(request.getEvent());
                     } else {
                         filterChain.doFilter
                             (request.getRequest(), response.getResponse());

Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java Mon Dec  3 16:06:24 2007
@@ -692,6 +692,14 @@
     public int getSocketBuffer() {
         return socketBuffer;
     }
+    
+    public void setBufferedWriteSize(int size) {
+        outputBuffer.setBufferedWriteSize(size);
+    }
+    
+    public int getBufferedWriteSize() {
+        return outputBuffer.getBufferedWriteSize();
+    }
 
     /**
      * Set the upload timeout.
@@ -744,20 +752,40 @@
     public SocketState event(SocketStatus status)
         throws IOException {
 
-        RequestInfo rp = request.getRequestProcessor();
+        NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) socket.getAttachment(false);
+        if (status == SocketStatus.OPEN_WRITE) {
+            //flush out buffered write data
+            if (outputBuffer.hasDataToWrite()) {
+                int cnt = 0;
+                do {
+                    cnt = outputBuffer.flushBuffer(false);
+                }while (cnt>0);
+            }
+            //return if we have more data to write
+            if (outputBuffer.hasDataToWrite()) return SocketState.LONG;
+            
+            //return if the comet processor wasn't registered for WRITE
+            if (attach!=null && (attach.getCometOps()&PollerInterest.WRITE)!=PollerInterest.WRITE) {
+                return SocketState.LONG;
+            }
+        }
 
+        RequestInfo rp = request.getRequestProcessor();
         try {
             rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
             error = !adapter.event(request, response, status);
             if ( !error ) {
-                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
-                if (attach != null) {
-                    attach.setComet(comet);
-                    if (!comet) {
-                        //reset the timeout
-                        attach.setTimeout(endpoint.getSocketProperties().getSoTimeout());
-                    }
+                try {
+                    
+                    if (attach != null) {
+                        attach.setComet(comet);
+                        if (!comet) {
+                            //reset the timeout
+                            attach.setTimeout(endpoint.getSocketProperties().getSoTimeout());
+                        }
 
+                    }
+                } catch (Exception ex) {
                 }
             }
         } catch (InterruptedIOException e) {
@@ -992,16 +1020,6 @@
         localName = null;
         remotePort = -1;
         localPort = -1;
-        //fix the synchronization scenario due to 
-        //dual comet flags.
-        //while the response/request
-        //might already be recycled, this circumvents the bug
-        //and should not be an expensive operation
-        //however, this is a TODO and FIXME
-        //as it would be better coordinate the recycling of the request/response
-        //instead
-        response.recycle();
-        request.recycle();
     }
 
 
@@ -1242,10 +1260,11 @@
                 socket.getPoller().cometInterest(socket);
         } else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE_BLOCKING) {
             MutableBoolean bool = (MutableBoolean)param;
-            if ( bool.get() ) throw new IllegalStateException("Not yet implemented");
             RequestInfo rp = request.getRequestProcessor();
             if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
                 throw new IllegalStateException("Can only be configured during an event.");
+            inputBuffer.setBlocking(bool.get());
+            outputBuffer.setBlocking(bool.get());
         } else if (actionCode == ActionCode.ACTION_COMET_READABLE) {
             MutableBoolean bool = (MutableBoolean)param;
             try {

Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java Mon Dec  3 16:06:24 2007
@@ -39,6 +39,7 @@
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.NioEndpoint.Handler;
+import org.apache.tomcat.util.net.PollerInterest;
 import org.apache.tomcat.util.net.SSLImplementation;
 import org.apache.tomcat.util.net.SecureNioChannel;
 import org.apache.tomcat.util.net.SocketStatus;
@@ -219,7 +220,7 @@
     private int socketCloseDelay=-1;
     private boolean disableUploadTimeout = true;
     private int socketBuffer = 9000;
-    
+    private int bufferedWriteSize = 64*1024;
     private Adapter adapter;
     private Http11ConnectionHandler cHandler;
 
@@ -533,6 +534,10 @@
         this.processorCache = processorCache;
     }
 
+    public void setBufferedWriteSize(int bufferedWriteSize) {
+        this.bufferedWriteSize = bufferedWriteSize;
+    }
+
     public void setOomParachute(int oomParachute) {
         ep.setOomParachute(oomParachute);
         setAttribute("oomParachute",oomParachute);
@@ -682,7 +687,9 @@
                         if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
                         //add correct poller events here based on Comet stuff
                         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
-                        socket.getPoller().add(socket,att.getCometOps());
+                        int ops = att.getCometOps();
+                        if (result.outputBuffer.hasDataToWrite()) ops = ops|PollerInterest.WRITE;
+                        socket.getPoller().add(socket,ops);
                     }
                 }
             }
@@ -725,7 +732,9 @@
                     connections.put(socket, processor);
                     if (processor.comet) {
                         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
-                        socket.getPoller().add(socket,att.getCometOps());
+                        int ops = att.getCometOps();
+                        if (processor.outputBuffer.hasDataToWrite()) ops = ops|PollerInterest.WRITE;
+                        socket.getPoller().add(socket,ops);
                     } else {
                         socket.getPoller().add(socket);
                     }
@@ -775,6 +784,7 @@
             processor.setCompressableMimeTypes(proto.compressableMimeTypes);
             processor.setRestrictedUserAgents(proto.restrictedUserAgents);
             processor.setSocketBuffer(proto.socketBuffer);
+            processor.setBufferedWriteSize(proto.bufferedWriteSize);
             processor.setMaxSavePostSize(proto.maxSavePostSize);
             processor.setServer(proto.server);
             register(processor);
@@ -842,6 +852,10 @@
 
     public int getProcessorCache() {
         return processorCache;
+    }
+
+    public int getBufferedWriteSize() {
+        return bufferedWriteSize;
     }
 
     public int getOomParachute() {

Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java Mon Dec  3 16:06:24 2007
@@ -28,9 +28,9 @@
 import org.apache.tomcat.util.buf.MessageBytes;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
-import org.apache.tomcat.util.net.NioEndpoint;
 
 /**
  * Implementation of InputBuffer which provides HTTP request header parsing as
@@ -188,7 +188,10 @@
      */
     protected int lastActiveFilter;
 
-
+    /**
+     * Flag used only for Comet requests/responses
+     */
+    protected boolean blocking = true;
     // ------------------------------------------------------------- Properties
 
 
@@ -206,6 +209,10 @@
         return socket;
     }
 
+    public boolean isBlocking() {
+        return blocking;
+    }
+
     public void setSelectorPool(NioSelectorPool pool) { 
         this.pool = pool;
     }
@@ -283,6 +290,10 @@
         this.swallowInput = swallowInput;
     }
 
+    public void setBlocking(boolean blocking) {
+        this.blocking = blocking;
+    }
+
     // --------------------------------------------------------- Public Methods
     /**
      * Returns true if there are bytes available from the socket layer
@@ -328,7 +339,7 @@
         parsingRequestLineQPos = -1;
         headerData.recycle();
         swallowInput = true;
-
+        blocking = true;
     }
 
 
@@ -373,7 +384,7 @@
         parsingRequestLineQPos = -1;
         headerData.recycle();
         swallowInput = true;
-
+        blocking = true;
     }
 
 
@@ -889,7 +900,10 @@
             throws IOException {
 
             if (pos >= lastValid) {
-                if (!fill(true,true)) //read body, must be blocking, as the thread is inside the app
+                //since the filters are not stateful
+                //we can't issue non blocking reads. 
+                //It simply doesn't work.
+                if (!fill(true,true)) 
                     return -1;
             }
 

Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Mon Dec  3 16:06:24 2007
@@ -25,6 +25,7 @@
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Response;
+import org.apache.tomcat.util.MutableInteger;
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.buf.CharChunk;
 import org.apache.tomcat.util.buf.MessageBytes;
@@ -34,8 +35,7 @@
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
-import java.io.EOFException;
-import org.apache.tomcat.util.MutableInteger;
+import java.nio.BufferOverflowException;
 
 /**
  * Output buffer.
@@ -77,8 +77,6 @@
         } else {
             bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
         }
-        //bbuf = ByteBuffer.allocateDirect(bbufLimit);
-
         outputStreamOutputBuffer = new SocketOutputBuffer();
 
         filterLibrary = new OutputFilter[0];
@@ -183,6 +181,31 @@
      */
     protected int lastActiveFilter;
     
+    /**
+     * Flag used only for Comet requests/responses
+     */
+    protected boolean blocking = true;
+    
+    /**
+     * Track if the byte buffer is flipped
+     */
+    protected boolean flipped = false;
+    
+    /**
+     * For "non-blocking" writes use an external buffer
+     */
+    protected ByteBuffer bufferedWrite = null;
+    
+    /**
+     * The max size of the buffered write buffer
+     */
+    protected int bufferedWriteSize = 64*1024; //64k default write buffer 
+    
+    /**
+     * track if buffered buffer is flipped
+     */
+    protected boolean bufflipped = false;
+    
     // ------------------------------------------------------------- Properties
 
 
@@ -193,6 +216,19 @@
         this.socket = socket;
     }
 
+    public void setBlocking(boolean blocking) {
+        this.blocking = blocking;
+        bufflipped = false;
+        if (blocking) 
+            bufferedWrite = null;
+        else
+            bufferedWrite = ByteBuffer.allocate(bufferedWriteSize);
+    }
+
+    public void setBufferedWriteSize(int bufferedWriteSize) {
+        this.bufferedWriteSize = bufferedWriteSize;
+    }
+
     /**
      * Get the underlying socket input stream.
      */
@@ -200,6 +236,36 @@
         return socket;
     }
 
+    public boolean isBlocking() {
+        return blocking;
+    }
+
+    public ByteBuffer getBufferedWrite() {
+        return bufferedWrite;
+    }
+    
+    public boolean hasBufferedData() {
+        if (getBufferedWrite()!=null) {
+            if (bufflipped) return getBufferedWrite().hasRemaining();
+            else return getBufferedWrite().position()>0;
+        }else {
+            return false;
+        }
+    }
+    
+    public boolean hasDataToWrite() {
+        if (!hasBufferedData()) {
+            if (flipped) return socket.getBufHandler().getWriteBuffer().hasRemaining();
+            else return socket.getBufHandler().getWriteBuffer().position()>0;
+        }else {
+            return true;
+        }
+    }
+
+    public int getBufferedWriteSize() {
+        return bufferedWriteSize;
+    }
+
     public void setSelectorPool(NioSelectorPool pool) { 
         this.pool = pool;
     }
@@ -286,19 +352,14 @@
      */
     public void flush()
         throws IOException {
-
         if (!committed) {
 
             // Send the connector a request for commit. The connector should
             // then validate the headers, send them (using sendHeader) and 
             // set the filters accordingly.
             response.action(ActionCode.ACTION_COMMIT, null);
-
         }
-
-        // Flush the current buffer
-        flushBuffer();
-
+        flushBuffer(isBlocking());
     }
 
 
@@ -322,6 +383,10 @@
      * connection.
      */
     public void recycle() {
+        recycle(true);
+    }
+    public void recycle(boolean clearbuf) {
+
         // Recycle filters
         for (int i = 0; i <= lastActiveFilter; i++) {
             activeFilters[i].recycle();
@@ -329,7 +394,7 @@
 
         // Recycle Request object
         response.recycle();
-        socket.getBufHandler().getWriteBuffer().clear();
+        if (clearbuf && socket!=null) socket.getBufHandler().getWriteBuffer().clear();
 
         socket = null;
         pos = 0;
@@ -337,7 +402,8 @@
         committed = false;
         finished = false;
         lastWrite.set(1);
-
+        setBlocking(true);
+        flipped = false;
     }
 
 
@@ -348,21 +414,7 @@
      * to parse the next HTTP request.
      */
     public void nextRequest() {
-
-        // Recycle Request object
-        response.recycle();
-
-        // Recycle filters
-        for (int i = 0; i <= lastActiveFilter; i++) {
-            activeFilters[i].recycle();
-        }
-
-        // Reset pointers
-        pos = 0;
-        lastActiveFilter = -1;
-        committed = false;
-        finished = false;
-
+        recycle(false);//proper pipeline support?
     }
 
 
@@ -389,14 +441,18 @@
         if (lastActiveFilter != -1)
             activeFilters[lastActiveFilter].end();
 
-        flushBuffer();
+        flushBuffer(true); //dont return upon call of close()
 
         finished = true;
 
     }
 
     public boolean isWritable() {
-        return lastWrite.get()>0;
+        if (lastWrite.get()>0) {
+            return !hasDataToWrite();
+        }else {
+            return false;
+        }
     }
     // ------------------------------------------------ HTTP/1.1 Output Methods
 
@@ -408,9 +464,8 @@
         throws IOException {
 
         if (!committed) {
-            //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
             socket.getBufHandler() .getWriteBuffer().put(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
-            writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true);
+            writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true);//ack is always blocking
         }
 
     }
@@ -424,8 +479,10 @@
      * @todo Fix non blocking write properly
      */
     private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
-        if ( flip ) bytebuffer.flip();
-
+        if ( flip ) {
+            bytebuffer.flip();
+            flipped = true;
+        }
         int written = 0;
         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
         if ( att == null ) throw new IOException("Key must be cancelled");
@@ -440,12 +497,18 @@
             written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite);
             //make sure we are flushed 
             do {
-                if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
+                //force writing of the net buffer on SSL
+                if (socket.flush(true,selector,writeTimeout,lastWrite)) break; 
             }while ( true );
         }finally { 
             if ( selector != null ) getSelectorPool().put(selector);
         }
-        if ( block ) bytebuffer.clear(); //only clear
+        if ( block || bytebuffer.remaining()==0) {
+            //blocking writes must empty the buffer
+            //and if remaining==0 then we did empty it
+            bytebuffer.clear();
+            flipped = false;
+        }
         this.total = 0;
         return written;
     } 
@@ -612,13 +675,20 @@
 
     int total = 0;
     private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
-        while (socket.getBufHandler().getWriteBuffer().remaining() < length) {
-            flushBuffer();
+        if (isBlocking()) {
+            while (socket.getBufHandler().getWriteBuffer().remaining() < length) {
+                flushBuffer(true);
+            }
+            socket.getBufHandler().getWriteBuffer().put(buf, offset, length);
+            total += length;
+            NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment) socket.getAttachment(false);
+            if (ka != null)
+                ka.access(); //prevent timeouts for just doing client writes
+        } else {
+            if (bufferedWrite.remaining()<length) throw new IOException("BufferOverflowException:Unable to fit buffered write data in buffer.");
+            if (bufflipped) throw new IOException("Invalid write attempt, previous buffered write not completed.");
+            bufferedWrite.put(buf, offset, length);
         }
-        socket.getBufHandler().getWriteBuffer().put(buf, offset, length);
-        total += length;
-        NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
-        if ( ka!= null ) ka.access();//prevent timeouts for just doing client writes
     }
 
 
@@ -753,25 +823,58 @@
 
     /**
      * Callback to write data from the buffer.
+     * @return the number of bytes written
      */
-    protected void flushBuffer()
+    protected int flushBuffer(boolean block)
         throws IOException {
-
+        int result = 0;
         //prevent timeout for async,
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if (key != null) {
             NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
             attach.access();
         }
-
+        
         //write to the socket, if there is anything to write
-        if (socket.getBufHandler().getWriteBuffer().position() > 0) {
-            socket.getBufHandler().getWriteBuffer().flip();
-            writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false);
+        if ((flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) ||
+             (!flipped && socket.getBufHandler().getWriteBuffer().position() > 0) ) {
+            result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
+        } else if (bufferedWrite!=null) {
+            if ((bufflipped && bufferedWrite.remaining()>0)||(!bufflipped && bufferedWrite.position()>0)) {
+                //transfer to the socket buffer
+                if (!bufflipped) {
+                    bufferedWrite.flip();
+                    bufflipped = true;
+                }
+                transfer(bufferedWrite, socket.getBufHandler().getWriteBuffer());
+                if (bufferedWrite.remaining() == 0) {
+                    bufferedWrite.clear();
+                    bufflipped = false;
+                }
+                result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
+            }
+        }
+        return result;
+    }
+    
+    protected int transfer(ByteBuffer from, ByteBuffer to) {
+        int remaining = from.remaining();
+        int toRemaining = to.remaining();
+        if (toRemaining >= remaining) {
+            to.put(from);
+            return remaining;
+        } else {
+            int limit = from.limit();
+            int position = from.position();
+            from.limit(position + toRemaining);
+            to.put(from);
+            from.limit(limit);
+            return toRemaining;
         }
     }
 
 
+
     // ----------------------------------- OutputStreamOutputBuffer Inner Class
 
 
@@ -792,19 +895,24 @@
             int len = chunk.getLength();
             int start = chunk.getStart();
             byte[] b = chunk.getBuffer();
-            while (len > 0) {
-                int thisTime = len;
-                if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) {
-                    flushBuffer();
-                }
-                if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) {
-                    thisTime = socket.getBufHandler().getWriteBuffer().remaining();
+            if (isBlocking()) {
+                while (len > 0) {
+                    int thisTime = len;
+                    if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) {
+                        flushBuffer(true);
+                    }
+                    if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) {
+                        thisTime = socket.getBufHandler().getWriteBuffer().remaining();
+                    }
+                    addToBB(b,start,thisTime);
+                    len = len - thisTime;
+                    start = start + thisTime;
                 }
-                addToBB(b,start,thisTime);
-                len = len - thisTime;
-                start = start + thisTime;
+                return chunk.getLength();
+            }else {
+                addToBB(b,start,len);
+                return len;
             }
-            return chunk.getLength();
 
         }
 

Modified: tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec  3 16:06:24 2007
@@ -1419,8 +1419,12 @@
                     //the comet event takes care of clean up
                     //processSocket(ka.getChannel(), status, dispatch);
                     ka.setComet(false);//to avoid a loop
-                    processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key
-                    if (status == SocketStatus.TIMEOUT ) return; // don't close on comet timeout
+                    if (status == SocketStatus.TIMEOUT ) {
+                        processSocket(ka.getChannel(), status, true);
+                        return; // don't close on comet timeout
+                    } else {
+                        processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key
+                    }                    
                 }
                 handler.release(ka.getChannel());
                 if (key.isValid()) key.cancel();
@@ -1545,12 +1549,12 @@
                                 //set interest ops to 0 so we don't get multiple
                                 //invokations for both read and write on separate threads
                                 reg(sk, attachment, 0);
-                                //read goes before write
-                                if (sk.isReadable()) {
-                                    if (!processSocket(channel, SocketStatus.OPEN_READ))
+                                //write goes before write
+                                if (sk.isWritable()) {
+                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE))
                                         processSocket(channel, SocketStatus.DISCONNECT);
                                 } else {
-                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE))
+                                    if (!processSocket(channel, SocketStatus.OPEN_READ))
                                         processSocket(channel, SocketStatus.DISCONNECT);
                                 }
                             } else {

Modified: tomcat/sandbox/gdev6x/webapps/docs/aio.xml
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/aio.xml?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/webapps/docs/aio.xml (original)
+++ tomcat/sandbox/gdev6x/webapps/docs/aio.xml Mon Dec  3 16:06:24 2007
@@ -865,6 +865,16 @@
   </ul>
 
   </section>
+  <section name="Non blocking buffered writes">
+  <p>
+    Another feature of Comet is that you can enable buffered non blocking writes.
+    This is not non blocking as in the NIO sense, instead tomcat will buffer your response 
+    and when you invoke response.flushBuffer() it will try to flush the buffer to the socket in 
+    a non blocking fashion. If the buffer wrote out completely, CometEvent.isWriteable() will return true,
+    if there is more in the buffer to write, CometEvent.isWriteable() will return false, and you should not attempt 
+    further writes, instead register for CometEvent.CometOperation.OP_WRITE to be notified when you can write again.
+  </p>
+  </section>
 
 </body>
 </document>

Modified: tomcat/sandbox/gdev6x/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/changelog.xml?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/webapps/docs/changelog.xml (original)
+++ tomcat/sandbox/gdev6x/webapps/docs/changelog.xml Mon Dec  3 16:06:24 2007
@@ -17,6 +17,11 @@
 <body>
 <section name="Tomcat g6.xdev(unknown)">
   <subsection name="Catalina">
+    <update>
+      Implement buffered write
+      This means that a Comet servlet can write to the buffer without blocking.
+      Writing actually happens upon flushBuffer in a non blocking fashion
+    </update>
     <fix><bug>43653</bug>
       Fix for SSL buffer mixup
     </fix>

Modified: tomcat/sandbox/gdev6x/webapps/docs/config/http.xml
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/config/http.xml?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/webapps/docs/config/http.xml (original)
+++ tomcat/sandbox/gdev6x/webapps/docs/config/http.xml Mon Dec  3 16:06:24 2007
@@ -556,6 +556,11 @@
            If you have an OOM outside of the Java Heap, then this parachute trick will not help.
         </p>
       </attribute>
+      <attribute name="bufferedWriteSize" required="false">
+        <p>(int) The size in bytes that should be used when Comet servlets used buffered/non blocking write logic.
+           The default is <code>64kb</code> or <code>64*1024 bytes</code>.
+        </p>
+      </attribute>
     </attributes>
   </subsection>
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org