You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/01/12 09:59:43 UTC

svn commit: r1651043 - in /tomcat/trunk: java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/upgrade/ java/org/apache/tomcat/util/net/ java/org/apache/tomcat/websocket/server/ test/org/apache/catalina/nonblocking/ test/org/apache/coyote/http11/up...

Author: markt
Date: Mon Jan 12 08:59:42 2015
New Revision: 1651043

URL: http://svn.apache.org/r1651043
Log:
A further round of refactoring of writes. Primary change is the
use of socketWriteBuffer by default for all writes

Modified:
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
    tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Mon Jan 12 08:59:42 2015
@@ -737,6 +737,7 @@ public class AjpProcessor<S> extends Abs
                     cping = true;
                     try {
                         socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length);
+                        socketWrapper.flush(true);
                     } catch (IOException e) {
                         setErrorState(ErrorState.CLOSE_NOW, e);
                     }
@@ -1035,6 +1036,7 @@ public class AjpProcessor<S> extends Abs
         // Request more data immediately
         if (!waitingForBodyMessage) {
             socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length);
+            socketWrapper.flush(true);
             waitingForBodyMessage = true;
         }
 
@@ -1442,6 +1444,7 @@ public class AjpProcessor<S> extends Abs
         // Write to buffer
         responseMessage.end();
         socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen());
+        socketWrapper.flush(true);
     }
 
 
@@ -1455,6 +1458,7 @@ public class AjpProcessor<S> extends Abs
         if (explicit && !finished) {
             // Send the flush message
             socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length);
+            socketWrapper.flush(true);
         }
     }
 
@@ -1490,6 +1494,7 @@ public class AjpProcessor<S> extends Abs
         } else {
             socketWrapper.write(true, endMessageArray, 0, endMessageArray.length);
         }
+        socketWrapper.flush(true);
     }
 
 
@@ -1556,6 +1561,7 @@ public class AjpProcessor<S> extends Abs
             responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
             responseMessage.end();
             socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen());
+            socketWrapper.flush(blocking);
 
             len -= thisTime;
             off += thisTime;

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Mon Jan 12 08:59:42 2015
@@ -123,6 +123,12 @@ public class UpgradeServletOutputStream
 
 
     @Override
+    public void flush() throws IOException {
+        socketWrapper.flush(listener == null);
+    }
+
+
+    @Override
     public void close() throws IOException {
         closeRequired = true;
         socketWrapper.close();
@@ -130,7 +136,7 @@ public class UpgradeServletOutputStream
 
 
     private void preWriteChecks() {
-        if (listener != null && socketWrapper.hasDataToWrite()) {
+        if (listener != null && !socketWrapper.canWrite()) {
             throw new IllegalStateException(sm.getString("upgrade.sis.write.ise"));
         }
     }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Jan 12 08:59:42 2015
@@ -2509,8 +2509,7 @@ public class AprEndpoint extends Abstrac
 
 
         @Override
-        protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
-                throws IOException {
+        protected int doWrite(boolean block, boolean flip) throws IOException {
             if (closed) {
                 throw new IOException(sm.getString("apr.closed", getSocket()));
             }
@@ -2521,7 +2520,7 @@ public class AprEndpoint extends Abstrac
             readLock.lock();
             try {
                 if (getBlockingStatus() == block) {
-                    return doWriteInternal(bytebuffer, flip);
+                    return doWriteInternal(flip);
                 }
             } finally {
                 readLock.unlock();
@@ -2541,7 +2540,7 @@ public class AprEndpoint extends Abstrac
                 readLock.lock();
                 try {
                     writeLock.unlock();
-                    return doWriteInternal(bytebuffer, flip);
+                    return doWriteInternal(flip);
                 } finally {
                     readLock.unlock();
                 }
@@ -2555,10 +2554,9 @@ public class AprEndpoint extends Abstrac
         }
 
 
-        private int doWriteInternal(ByteBuffer bytebuffer, boolean flip)
-                throws IOException {
+        private int doWriteInternal(boolean flip) throws IOException {
             if (flip) {
-                bytebuffer.flip();
+                socketWriteBuffer.flip();
                 writeBufferFlipped = true;
             }
 
@@ -2571,7 +2569,7 @@ public class AprEndpoint extends Abstrac
                     if (sslOutputBuffer.remaining() == 0) {
                         // Buffer was fully written last time around
                         sslOutputBuffer.clear();
-                        transfer(bytebuffer, sslOutputBuffer);
+                        transfer(socketWriteBuffer, sslOutputBuffer);
                         sslOutputBuffer.flip();
                     } else {
                         // Buffer still has data from previous attempt to write
@@ -2585,8 +2583,9 @@ public class AprEndpoint extends Abstrac
                                 sslOutputBuffer.position() + sslWritten);
                     }
                 } else {
-                    thisTime = Socket.sendb(getSocket().longValue(), bytebuffer,
-                            bytebuffer.position(), bytebuffer.limit() - bytebuffer.position());
+                    thisTime = Socket.sendb(getSocket().longValue(),
+                            socketWriteBuffer, socketWriteBuffer.position(),
+                            socketWriteBuffer.limit() - socketWriteBuffer.position());
                 }
                 if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) {
                     thisTime = 0;
@@ -2601,11 +2600,11 @@ public class AprEndpoint extends Abstrac
                             Integer.valueOf(-thisTime), getSocket(), this));
                 }
                 written += thisTime;
-                bytebuffer.position(bytebuffer.position() + thisTime);
-            } while (thisTime > 0 && bytebuffer.hasRemaining());
+                socketWriteBuffer.position(socketWriteBuffer.position() + thisTime);
+            } while (thisTime > 0 && socketWriteBuffer.hasRemaining());
 
-            if (bytebuffer.remaining() == 0) {
-                bytebuffer.clear();
+            if (socketWriteBuffer.remaining() == 0) {
+                socketWriteBuffer.clear();
                 writeBufferFlipped = false;
             }
             // If there is data left in the buffer the socket will be registered for

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Jan 12 08:59:42 2015
@@ -1064,7 +1064,10 @@ public class Nio2Endpoint extends Abstra
 
         @Override
         public void close() throws IOException {
-            getSocket().close();
+            Nio2Channel socket = getSocket();
+            if (socket != null) {
+                socket.close();
+            }
         }
 
 
@@ -1107,167 +1110,128 @@ public class Nio2Endpoint extends Abstra
         }
 
 
+        /**
+         * {@inheritDoc}
+         * <p>
+         * Overridden for NIO2 to enable a gathering write to be used to write
+         * all of the remaining data in a single additional write should a
+         * non-blocking write leave data in the buffer.
+         */
         @Override
-        public void write(boolean block, byte[] buf, int off, int len) throws IOException {
-            if (len == 0 || getSocket() == null)
-                return;
-
-            if (block) {
-                try {
-                    do {
-                        int thisTime = transfer(buf, off, len, socketWriteBuffer);
-                        len = len - thisTime;
-                        off = off + thisTime;
-                        socketWriteBuffer.flip();
-                        while (socketWriteBuffer.hasRemaining()) {
-                            if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
-                                throw new EOFException(sm.getString("iob.failedwrite"));
-                            }
-                        }
-                        socketWriteBuffer.clear();
-                    } while (len > 0);
-                } catch (ExecutionException e) {
-                    if (e.getCause() instanceof IOException) {
-                        throw (IOException) e.getCause();
-                    } else {
-                        throw new IOException(e);
-                    }
-                } catch (InterruptedException e) {
-                    throw new IOException(e);
-                } catch (TimeoutException e) {
-                    throw new SocketTimeoutException();
-                }
-            } else {
-                // FIXME: Possible new behavior:
-                // If there's non blocking abuse (like a test writing 1MB in a single
-                // "non blocking" write), then block until the previous write is
-                // done rather than continue buffering
-                // Also allows doing autoblocking
-                // Could be "smart" with coordination with the main CoyoteOutputStream to
-                // indicate the end of a write
-                // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
-                synchronized (writeCompletionHandler) {
-                    if (writePending.tryAcquire()) {
-                        // No pending completion handler, so writing to the main buffer
-                        // is possible
-                        int thisTime = transfer(buf, off, len, socketWriteBuffer);
-                        len = len - thisTime;
-                        off = off + thisTime;
-                        if (len > 0) {
-                            // Remaining data must be buffered
-                            addToBuffers(buf, off, len);
-                        }
-                        flush(false, true);
-                    } else {
+        protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException {
+            // FIXME: Possible new behavior:
+            // If there's non blocking abuse (like a test writing 1MB in a single
+            // "non blocking" write), then block until the previous write is
+            // done rather than continue buffering
+            // Also allows doing autoblocking
+            // Could be "smart" with coordination with the main CoyoteOutputStream to
+            // indicate the end of a write
+            // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
+            synchronized (writeCompletionHandler) {
+                if (writePending.tryAcquire()) {
+                    // No pending completion handler, so writing to the main buffer
+                    // is possible
+                    int thisTime = transfer(buf, off, len, socketWriteBuffer);
+                    len = len - thisTime;
+                    off = off + thisTime;
+                    if (len > 0) {
+                        // Remaining data must be buffered
                         addToBuffers(buf, off, len);
                     }
+                    flushNonBlocking(true);
+                } else {
+                    addToBuffers(buf, off, len);
                 }
             }
         }
 
 
         @Override
-        protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) throws IOException {
-            // NO-OP for NIO2 since write(boolean, byte[], int, int)  and
-            // flush(boolean, boolean) are over-ridden.
-            return 0;
+        protected int doWrite(boolean block, boolean flip) throws IOException {
+            // Only called in the non-blocking case since
+            // writeNonBlocking(byte[], int, int) and flush(boolean, boolean)
+            // are over-ridden.
+
+            int result = -1;
+            try {
+                socketWriteBuffer.flip();
+                result = socketWriteBuffer.remaining();
+                while (socketWriteBuffer.hasRemaining()) {
+                    if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
+                        throw new EOFException(sm.getString("iob.failedwrite"));
+                    }
+                }
+                socketWriteBuffer.clear();
+                return result;
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw new IOException(e);
+                }
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            } catch (TimeoutException e) {
+                throw new SocketTimeoutException();
+            }
         }
 
 
         @Override
-        public boolean flush(boolean block) throws IOException {
-            if (getError() != null) {
-                throw getError();
+        protected void flushBlocking() throws IOException {
+            // Before doing a blocking flush, make sure that any pending non
+            // blocking write has completed.
+            try {
+                if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) {
+                    writePending.release();
+                } else {
+                    throw new SocketTimeoutException();
+                }
+            } catch (InterruptedException e) {
+                // Ignore
             }
-            return super.flush(block);
-        }
 
+            super.flushBlocking();
+        }
 
         @Override
-        protected boolean flush(boolean block, boolean hasPermit) throws IOException {
-            if (getSocket() == null)
-                return false;
+        protected boolean flushNonBlocking() {
+            return flushNonBlocking(false);
+        }
 
-            if (block) {
-                try {
-                    if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) {
-                        writePending.release();
-                    } else {
-                        // TODO
-                    }
-                } catch (InterruptedException e) {
-                    // Ignore timeout
-                }
-                try {
-                    if (bufferedWrites.size() > 0) {
-                        for (ByteBufferHolder holder : bufferedWrites) {
-                            holder.flip();
-                            ByteBuffer buffer = holder.getBuf();
-                            while (buffer.hasRemaining()) {
-                                if (getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
-                                    throw new EOFException(sm.getString("iob.failedwrite"));
-                                }
-                            }
-                        }
-                        bufferedWrites.clear();
-                    }
+        private boolean flushNonBlocking(boolean hasPermit) {
+            synchronized (writeCompletionHandler) {
+                if (hasPermit || writePending.tryAcquire()) {
                     if (!writeBufferFlipped) {
                         socketWriteBuffer.flip();
                         writeBufferFlipped = true;
                     }
-                    while (socketWriteBuffer.hasRemaining()) {
-                        if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
-                            throw new EOFException(sm.getString("iob.failedwrite"));
+                    if (bufferedWrites.size() > 0) {
+                        // Gathering write of the main buffer plus all leftovers
+                        ArrayList<ByteBuffer> arrayList = new ArrayList<>();
+                        if (socketWriteBuffer.hasRemaining()) {
+                            arrayList.add(socketWriteBuffer);
+                        }
+                        for (ByteBufferHolder buffer : bufferedWrites) {
+                            buffer.flip();
+                            arrayList.add(buffer.getBuf());
                         }
-                    }
-                } catch (ExecutionException e) {
-                    if (e.getCause() instanceof IOException) {
-                        throw (IOException) e.getCause();
+                        bufferedWrites.clear();
+                        ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]);
+                        getSocket().write(array, 0, array.length, getTimeout(),
+                                TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler);
+                    } else if (socketWriteBuffer.hasRemaining()) {
+                        // Regular write
+                        getSocket().write(socketWriteBuffer, getTimeout(),
+                                TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler);
                     } else {
-                        throw new IOException(e);
-                    }
-                } catch (InterruptedException e) {
-                    throw new IOException(e);
-                } catch (TimeoutException e) {
-                    throw new SocketTimeoutException();
-                }
-                socketWriteBuffer.clear();
-                writeBufferFlipped = false;
-                return false;
-            } else {
-                synchronized (writeCompletionHandler) {
-                    if (hasPermit || writePending.tryAcquire()) {
-                        if (!writeBufferFlipped) {
-                            socketWriteBuffer.flip();
-                            writeBufferFlipped = true;
-                        }
-                        if (bufferedWrites.size() > 0) {
-                            // Gathering write of the main buffer plus all leftovers
-                            ArrayList<ByteBuffer> arrayList = new ArrayList<>();
-                            if (socketWriteBuffer.hasRemaining()) {
-                                arrayList.add(socketWriteBuffer);
-                            }
-                            for (ByteBufferHolder buffer : bufferedWrites) {
-                                buffer.flip();
-                                arrayList.add(buffer.getBuf());
-                            }
-                            bufferedWrites.clear();
-                            ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]);
-                            getSocket().write(array, 0, array.length, getTimeout(),
-                                    TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler);
-                        } else if (socketWriteBuffer.hasRemaining()) {
-                            // Regular write
-                            getSocket().write(socketWriteBuffer, getTimeout(),
-                                    TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler);
-                        } else {
-                            // Nothing was written
-                            writePending.release();
-                            socketWriteBuffer.clear();
-                            writeBufferFlipped = false;
-                        }
+                        // Nothing was written
+                        writePending.release();
+                        socketWriteBuffer.clear();
+                        writeBufferFlipped = false;
                     }
-                    return hasDataToWrite();
                 }
+                return hasDataToWrite();
             }
         }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Jan 12 08:59:42 2015
@@ -1472,7 +1472,10 @@ public class NioEndpoint extends Abstrac
 
         @Override
         public void close() throws IOException {
-            getSocket().close();
+            NioChannel socket = getSocket();
+            if (socket != null) {
+                socket.close();
+            }
         }
 
 
@@ -1509,10 +1512,10 @@ public class NioEndpoint extends Abstrac
 
 
         @Override
-        protected synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
+        protected synchronized int doWrite(boolean block, boolean flip)
                 throws IOException {
             if (flip) {
-                bytebuffer.flip();
+                socketWriteBuffer.flip();
                 writeBufferFlipped = true;
             }
 
@@ -1525,7 +1528,7 @@ public class NioEndpoint extends Abstrac
                 // Ignore
             }
             try {
-                written = pool.write(bytebuffer, getSocket(), selector, writeTimeout, block);
+                written = pool.write(socketWriteBuffer, getSocket(), selector, writeTimeout, block);
                 // Make sure we are flushed
                 do {
                     if (getSocket().flush(true, selector, writeTimeout)) break;
@@ -1535,8 +1538,8 @@ public class NioEndpoint extends Abstrac
                     pool.put(selector);
                 }
             }
-            if (bytebuffer.remaining() == 0) {
-                bytebuffer.clear();
+            if (socketWriteBuffer.remaining() == 0) {
+                socketWriteBuffer.clear();
                 writeBufferFlipped = false;
             }
             // If there is data left in the buffer the socket will be registered for

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Mon Jan 12 08:59:42 2015
@@ -188,9 +188,9 @@ public abstract class SocketWrapperBase<
     }
 
     /**
-     * Checks to see if there is any writes pending and if there is calls
+     * Checks to see if there are any writes pending and if there are calls
      * {@link #registerWriteInterest()} to trigger a callback once the pending
-     * write has completed.
+     * writes have completed.
      * <p>
      * Note: Once this method has returned <code>false</code> it <b>MUST NOT</b>
      *       be called again until the pending write has completed and the
@@ -202,13 +202,19 @@ public abstract class SocketWrapperBase<
      *         written otherwise <code>false</code>
      */
     public boolean isReadyForWrite() {
-        boolean result = !hasDataToWrite();
+        boolean result = canWrite();
         if (!result) {
             registerWriteInterest();
         }
         return result;
     }
 
+
+    public boolean canWrite() {
+        return !writeBufferFlipped && socketWriteBuffer.hasRemaining() &&
+                bufferedWrites.size() == 0;
+    }
+
     public void addDispatch(DispatchType dispatchType) {
         synchronized (dispatches) {
             dispatches.add(dispatchType);
@@ -286,79 +292,116 @@ public abstract class SocketWrapperBase<
     public abstract void unRead(ByteBuffer input);
     public abstract void close() throws IOException;
 
+
     /**
      * Writes the provided data to the socket, buffering any remaining data if
-     * used in non-blocking mode. If any data remains in the buffers from a
-     * previous write then that data will be written before this data. It is
-     * therefore unnecessary to call flush() before calling this method.
+     * used in non-blocking mode.
      *
      * @param block <code>true<code> if a blocking write should be used,
      *                  otherwise a non-blocking write will be used
-     * @param b     The byte array containing the data to be written
+     * @param buf   The byte array containing the data to be written
      * @param off   The offset within the byte array of the data to be written
      * @param len   The length of the data to be written
      *
      * @throws IOException If an IO error occurs during the write
      */
-    public void write(boolean block, byte[] b, int off, int len) throws IOException {
-        // Always flush any data remaining in the buffers
-        boolean dataLeft = flush(block, true);
-
-        if (len == 0 || b == null) {
+    public void write(boolean block, byte[] buf, int off, int len) throws IOException {
+        if (len == 0 || buf == null || getSocket() == null) {
             return;
         }
 
-        // Keep writing until all the data is written or a non-blocking write
-        // leaves data in the buffer
-        while (!dataLeft && len > 0) {
-            int thisTime = transfer(b, off, len, socketWriteBuffer);
-            len = len - thisTime;
-            off = off + thisTime;
-            int written = doWrite(socketWriteBuffer, block, true);
-            if (written == 0) {
-                dataLeft = true;
-            } else {
-                dataLeft = flush(block, true);
-            }
+        // While the implementations for blocking and non-blocking writes are
+        // very similar they have been split into separate methods to allow
+        // sub-classes to override them individually. NIO2, for example,
+        // overrides the non-blocking write but not the blocking write.
+        if (block) {
+            writeBlocking(buf, off, len);
+        } else {
+            writeNonBlocking(buf, off, len);
         }
 
-        // Prevent timeouts for just doing client writes
+        // Prevent timeouts
         access();
+    }
 
-        if (!block && len > 0) {
-            // Remaining data must be buffered
-            addToBuffers(b, off, len);
+
+    /**
+     * Transfers the data to the socket write buffer (writing that data to the
+     * socket if the buffer fills up using a blocking write) until all the data
+     * has been transferred and space remains in the socket write buffer.
+     *
+     * @param buf   The byte array containing the data to be written
+     * @param off   The offset within the byte array of the data to be written
+     * @param len   The length of the data to be written
+     *
+     * @throws IOException If an IO error occurs during the write
+     */
+    protected void writeBlocking(byte[] buf, int off, int len) throws IOException {
+        // Note: There is an implementation assumption that if the switch from
+        //       non-blocking to blocking has been made then any pending
+        //       non-blocking writes were flushed at the time the switch
+        //       occurred.
+
+        // Keep writing until all the data has been transferred to the socket
+        // write buffer and space remains in that buffer
+        int thisTime = transfer(buf, off, len, socketWriteBuffer);
+        while (socketWriteBuffer.remaining() == 0) {
+            len = len - thisTime;
+            off = off + thisTime;
+            // TODO: There is an assumption here that the blocking write will
+            //       block until all the data is written or the write times out.
+            //       Document this assumption in the Javadoc for doWrite(),
+            //       ensure it is valid for all implementations of doWrite() and
+            //       then review all callers of doWrite() and review what
+            //       simplifications this offers.
+            doWrite(true, true);
+            thisTime = transfer(buf, off, len, socketWriteBuffer);
         }
     }
 
 
     /**
-     * Writes as much data as possible from any that remains in the buffers.
+     * Transfers the data to the socket write buffer (writing that data to the
+     * socket if the buffer fills up using a non-blocking write) until either
+     * all the data has been transferred and space remains in the socket write
+     * buffer or a non-blocking write leaves data in the socket write buffer.
      *
-     * @param block <code>true<code> if a blocking write should be used,
-     *                  otherwise a non-blocking write will be used
-     *
-     * @return <code>true</code> if data remains to be flushed after this method
-     *         completes, otherwise <code>false</code>. In blocking mode
-     *         therefore, the return value should always be <code>false</code>
+     * @param buf   The byte array containing the data to be written
+     * @param off   The offset within the byte array of the data to be written
+     * @param len   The length of the data to be written
      *
      * @throws IOException If an IO error occurs during the write
      */
-    public boolean flush(boolean block) throws IOException {
-        return flush(block, false);
+    protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException {
+        if (!writeBufferFlipped) {
+            int thisTime = transfer(buf, off, len, socketWriteBuffer);
+            len = len - thisTime;
+            while (socketWriteBuffer.remaining() == 0) {
+                off = off + thisTime;
+                if (doWrite(false, !writeBufferFlipped) == 0) {
+                    break;
+                }
+                if (writeBufferFlipped) {
+                    thisTime = 0;
+                } else {
+                    thisTime = transfer(buf, off, len, socketWriteBuffer);
+                }
+                len = len - thisTime;
+            }
+        }
+
+        if (len > 0) {
+            // Remaining data must be buffered
+            addToBuffers(buf, off, len);
+        }
     }
 
 
     /**
      * Writes as much data as possible from any that remains in the buffers.
-     * This method exists for those implementations (e.g. NIO2) that need
-     * slightly different behaviour depending on if flush() was called directly
-     * or by another method in this class or a sub-class.
-     *
-     * @param block    <code>true<code> if a blocking write should be used,
-     *                     otherwise a non-blocking write will be used
-     * @param internal <code>true<code> if flush() was called by another method
-     *                     in class or sub-class
+     *
+     * @param block <code>true<code> if a blocking write should be used,
+     *                  otherwise a non-blocking write will be used
      *
      * @return <code>true</code> if data remains to be flushed after this method
      *         completes, otherwise <code>false</code>. In blocking mode
@@ -366,16 +409,57 @@ public abstract class SocketWrapperBase<
      *
      * @throws IOException If an IO error occurs during the write
      */
-    protected boolean flush(boolean block, boolean internal) throws IOException {
+    public boolean flush(boolean block) throws IOException {
+        if (getSocket() == null) {
+            return false;
+        }
 
-        // Prevent timeout for async
+        if (getError() != null) {
+            throw getError();
+        }
+
+        boolean result = false;
+        if (block) {
+            // A blocking flush will always empty the buffer.
+            flushBlocking();
+        } else {
+            result = flushNonBlocking();
+        }
+
+        // Prevent timeouts
         access();
 
+        return result;
+    }
+
+
+    protected void flushBlocking() throws IOException {
+        doWrite(true, !writeBufferFlipped);
+
+        if (bufferedWrites.size() > 0) {
+            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+                ByteBufferHolder buffer = bufIter.next();
+                buffer.flip();
+                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+                    transfer(buffer.getBuf(), socketWriteBuffer);
+                    if (buffer.getBuf().remaining() == 0) {
+                        bufIter.remove();
+                    }
+                    doWrite(true, !writeBufferFlipped);
+                }
+            }
+        }
+
+    }
+
+
+    protected boolean flushNonBlocking() throws IOException {
         boolean dataLeft = hasMoreDataToFlush();
 
         // Write to the socket, if there is anything to write
         if (dataLeft) {
-            doWrite(socketWriteBuffer, block, !writeBufferFlipped);
+            doWrite(false, !writeBufferFlipped);
         }
 
         dataLeft = hasMoreDataToFlush();
@@ -385,13 +469,12 @@ public abstract class SocketWrapperBase<
             while (!hasMoreDataToFlush() && bufIter.hasNext()) {
                 ByteBufferHolder buffer = bufIter.next();
                 buffer.flip();
-                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+                while (!hasMoreDataToFlush() && buffer.getBuf().remaining() > 0) {
                     transfer(buffer.getBuf(), socketWriteBuffer);
                     if (buffer.getBuf().remaining() == 0) {
                         bufIter.remove();
                     }
-                    doWrite(socketWriteBuffer, block, true);
-                    //here we must break if we didn't finish the write
+                    doWrite(false, !writeBufferFlipped);
                 }
             }
         }
@@ -400,8 +483,7 @@ public abstract class SocketWrapperBase<
     }
 
 
-    protected abstract int doWrite(ByteBuffer buffer, boolean block, boolean flip)
-            throws IOException;
+    protected abstract int doWrite(boolean block, boolean flip) throws IOException;
 
 
     protected void addToBuffers(byte[] buf, int offset, int length) {

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Mon Jan 12 08:59:42 2015
@@ -88,7 +88,7 @@ public class WsRemoteEndpointImplServer
             // was written
             return;
         }
-        boolean complete = true;
+        boolean complete = false;
         try {
             // If this is false there will be a call back when it is true
             while (sos.isReady()) {
@@ -103,6 +103,7 @@ public class WsRemoteEndpointImplServer
                     }
                 }
                 if (complete) {
+                    sos.flush();
                     wsWriteTimeout.unregister(this);
                     clearHandler(null, useDispatch);
                     if (close) {
@@ -117,9 +118,9 @@ public class WsRemoteEndpointImplServer
             clearHandler(ioe, useDispatch);
             close();
         }
+
         if (!complete) {
             // Async write is in progress
-
             long timeout = getSendTimeout();
             if (timeout > 0) {
                 // Register with timeout thread

Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Mon Jan 12 08:59:42 2015
@@ -221,19 +221,19 @@ public class TestNonBlockingAPI extends
                 boolean found = false;
                 for (int i = totalBodyRead; i < (totalBodyRead + line.length()); i++) {
                     if (DATA[i] != resultBytes[lineStart + i - totalBodyRead]) {
-                        int dataStart = i - 16;
+                        int dataStart = i - 64;
                         if (dataStart < 0) {
                             dataStart = 0;
                         }
-                        int dataEnd = i + 16;
+                        int dataEnd = i + 64;
                         if (dataEnd > DATA.length) {
                             dataEnd = DATA.length;
                         }
-                        int resultStart = lineStart + i - totalBodyRead - 16;
+                        int resultStart = lineStart + i - totalBodyRead - 64;
                         if (resultStart < 0) {
                             resultStart = 0;
                         }
-                        int resultEnd = lineStart + i - totalBodyRead + 16;
+                        int resultEnd = lineStart + i - totalBodyRead + 64;
                         if (resultEnd > resultString.length()) {
                             resultEnd = resultString.length();
                         }
@@ -492,25 +492,21 @@ public class TestNonBlockingAPI extends
                 @Override
                 public void onTimeout(AsyncEvent event) throws IOException {
                     log.info("onTimeout");
-
                 }
 
                 @Override
                 public void onStartAsync(AsyncEvent event) throws IOException {
                     log.info("onStartAsync");
-
                 }
 
                 @Override
                 public void onError(AsyncEvent event) throws IOException {
                     log.info("AsyncListener.onError");
-
                 }
 
                 @Override
                 public void onComplete(AsyncEvent event) throws IOException {
                     log.info("onComplete");
-
                 }
             });
             // step 2 - notify on read

Modified: tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java (original)
+++ tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Mon Jan 12 08:59:42 2015
@@ -47,7 +47,6 @@ import static org.apache.catalina.startu
 import org.apache.catalina.Context;
 import org.apache.catalina.startup.Tomcat;
 import org.apache.catalina.startup.TomcatBaseTest;
-import org.apache.catalina.util.IOTools;
 
 public class TestUpgrade extends TomcatBaseTest {
 
@@ -234,8 +233,12 @@ public class TestUpgrade extends TomcatB
 
             try (ServletInputStream sis = connection.getInputStream();
                  ServletOutputStream sos = connection.getOutputStream()){
-
-                IOTools.flow(sis, sos);
+                byte[] buffer = new byte[8192];
+                int read;
+                while ((read = sis.read(buffer)) >= 0) {
+                    sos.write(buffer, 0, read);
+                    sos.flush();
+                }
             } catch (IOException ioe) {
                 throw new IllegalStateException(ioe);
             }
@@ -274,7 +277,7 @@ public class TestUpgrade extends TomcatB
 
         private class EchoReadListener extends NoOpReadListener {
 
-            private byte[] buffer = new byte[8096];
+            private byte[] buffer = new byte[8192];
 
             @Override
             public void onDataAvailable() {
@@ -288,6 +291,7 @@ public class TestUpgrade extends TomcatB
                                 throw new IOException("Unable to echo data. " +
                                         "isReady() returned false");
                             }
+                            sos.flush();
                         }
                     }
                 } catch (IOException ioe) {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org