You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/01/08 14:09:55 UTC

svn commit: r1650269 - in /tomcat/trunk/java/org/apache: coyote/ajp/ coyote/http11/ coyote/http11/upgrade/ tomcat/util/net/

Author: markt
Date: Thu Jan  8 13:09:54 2015
New Revision: 1650269

URL: http://svn.apache.org/r1650269
Log:
Move writes and associated buffers to SocketWrapper for NIO. NIO2/APR likely broken at this point.

Modified:
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Jan  8 13:09:54 2015
@@ -24,8 +24,6 @@ import java.nio.ByteBuffer;
 import java.security.NoSuchProviderException;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.servlet.RequestDispatcher;
@@ -44,7 +42,6 @@ import org.apache.coyote.Response;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.ExceptionUtils;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.buf.HexUtils;
 import org.apache.tomcat.util.buf.MessageBytes;
@@ -183,22 +180,6 @@ public class AjpProcessor<S> extends Abs
 
 
     /**
-     * The max size of the buffered write buffer
-     */
-    private int bufferedWriteSize = 64*1024; //64k default write buffer
-
-
-    /**
-     * For "non-blocking" writes use an external set of buffers. Although the
-     * API only allows one non-blocking write at a time, due to buffering and
-     * the possible need to write HTTP headers, there may be more than one write
-     * to the OutputBuffer.
-     */
-    private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
-            new LinkedBlockingDeque<>();
-
-
-    /**
      * Host name (used to avoid useless B2C conversion on the host name).
      */
     protected char[] hostNameC = new char[0];
@@ -605,7 +586,7 @@ public class AjpProcessor<S> extends Abs
         }
         case NB_WRITE_INTEREST: {
             AtomicBoolean isReady = (AtomicBoolean)param;
-            boolean result = bufferedWrites.size() == 0 && responseMsgPos == -1;
+            boolean result = !socketWrapper.hasDataToWrite() && responseMsgPos == -1;
             isReady.set(result);
             if (!result) {
                 registerForEvent(false, true);
@@ -647,7 +628,8 @@ public class AjpProcessor<S> extends Abs
                 asyncStateMachine.asyncOperation();
                 try {
                     if (hasDataToWrite()) {
-                        flushBufferedData();
+                        boolean blocking = (response.getWriteListener() == null);
+                        socketWrapper.flush(blocking);
                         if (hasDataToWrite()) {
                             // There is data to write but go via Response to
                             // maintain a consistent view of non-blocking state
@@ -755,7 +737,7 @@ public class AjpProcessor<S> extends Abs
                     }
                     cping = true;
                     try {
-                        output(pongMessageArray, 0, pongMessageArray.length, true);
+                        socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length);
                     } catch (IOException e) {
                         setErrorState(ErrorState.CLOSE_NOW, e);
                     }
@@ -1053,7 +1035,7 @@ public class AjpProcessor<S> extends Abs
 
         // Request more data immediately
         if (!waitingForBodyMessage) {
-            output(getBodyMessageArray, 0, getBodyMessageArray.length, true);
+            socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length);
             waitingForBodyMessage = true;
         }
 
@@ -1460,7 +1442,7 @@ public class AjpProcessor<S> extends Abs
 
         // Write to buffer
         responseMessage.end();
-        output(responseMessage.getBuffer(), 0, responseMessage.getLen(), true);
+        socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen());
     }
 
 
@@ -1473,7 +1455,7 @@ public class AjpProcessor<S> extends Abs
         // TODO Validate the assertion above
         if (explicit && !finished) {
             // Send the flush message
-            output(flushMessageArray, 0, flushMessageArray.length, true);
+            socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length);
         }
     }
 
@@ -1505,22 +1487,13 @@ public class AjpProcessor<S> extends Abs
 
         // Add the end message
         if (getErrorState().isError()) {
-            output(endAndCloseMessageArray, 0, endAndCloseMessageArray.length, true);
+            socketWrapper.write(true, endAndCloseMessageArray, 0, endAndCloseMessageArray.length);
         } else {
-            output(endMessageArray, 0, endMessageArray.length, true);
+            socketWrapper.write(true, endMessageArray, 0, endMessageArray.length);
         }
     }
 
 
-    private int output(byte[] src, int offset, int length,
-            boolean block) throws IOException {
-        if (socketWrapper == null || socketWrapper.getSocket() == null)
-            return -1;
-
-        return socketWrapper.write(block, src, offset, length);
-    }
-
-
     private boolean available() {
         if (endOfStream) {
             return false;
@@ -1569,15 +1542,12 @@ public class AjpProcessor<S> extends Abs
         socketWrapper.access();
 
         boolean blocking = (response.getWriteListener() == null);
-        if (!blocking) {
-            flushBufferedData();
-        }
 
         int len = chunk.getLength();
         int off = 0;
 
         // Write this chunk
-        while (responseMsgPos == -1 && len > 0) {
+        while (len > 0) {
             int thisTime = len;
             if (thisTime > outputMaxChunkSize) {
                 thisTime = outputMaxChunkSize;
@@ -1586,96 +1556,18 @@ public class AjpProcessor<S> extends Abs
             responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
             responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
             responseMessage.end();
-            writeResponseMessage(blocking);
+            socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen());
 
             len -= thisTime;
             off += thisTime;
         }
 
         bytesWritten += off;
-
-        if (len > 0) {
-            // Add this chunk to the buffer
-            addToBuffers(chunk.getBuffer(), off, len);
-        }
-    }
-
-
-    private void addToBuffers(byte[] buf, int offset, int length) {
-        ByteBufferHolder holder = bufferedWrites.peekLast();
-        if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) {
-            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
-            holder = new ByteBufferHolder(buffer, false);
-            bufferedWrites.add(holder);
-        }
-        holder.getBuf().put(buf, offset, length);
     }
 
 
     private boolean hasDataToWrite() {
-        return responseMsgPos != -1 || bufferedWrites.size() > 0;
-    }
-
-
-    private void flushBufferedData() throws IOException {
-
-        if (responseMsgPos > -1) {
-            // Must be using non-blocking IO
-            // Partially written response message. Try and complete it.
-            writeResponseMessage(false);
-        }
-
-        while (responseMsgPos == -1 && bufferedWrites.size() > 0) {
-            // Try and write any remaining buffer data
-            Iterator<ByteBufferHolder> holders = bufferedWrites.iterator();
-            ByteBufferHolder holder = holders.next();
-            holder.flip();
-            ByteBuffer buffer = holder.getBuf();
-            int initialBufferSize = buffer.remaining();
-            while (responseMsgPos == -1 && buffer.remaining() > 0) {
-                transferToResponseMsg(buffer);
-                writeResponseMessage(false);
-            }
-            bytesWritten += (initialBufferSize - buffer.remaining());
-            if (buffer.remaining() == 0) {
-                holders.remove();
-            }
-        }
-    }
-
-
-    private void transferToResponseMsg(ByteBuffer buffer) {
-
-        int thisTime = buffer.remaining();
-        if (thisTime > outputMaxChunkSize) {
-            thisTime = outputMaxChunkSize;
-        }
-
-        responseMessage.reset();
-        responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
-        buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime);
-        responseMessage.end();
-    }
-
-
-    private void writeResponseMessage(boolean block) throws IOException {
-        int len = responseMessage.getLen();
-        int written = 1;
-        if (responseMsgPos == -1) {
-            // New message. Advance the write position to the beginning
-            responseMsgPos = 0;
-        }
-
-        while (written > 0 && responseMsgPos < len) {
-            written = output(
-                    responseMessage.getBuffer(), responseMsgPos, len - responseMsgPos, block);
-            responseMsgPos += written;
-        }
-
-        // Message fully written, reset the position for a new message.
-        if (responseMsgPos == len) {
-            responseMsgPos = -1;
-        }
+        return responseMsgPos != -1 || socketWrapper.hasDataToWrite();
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu Jan  8 13:09:54 2015
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.Iterator;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.coyote.ActionCode;
@@ -653,19 +652,7 @@ public abstract class AbstractOutputBuff
 
 
     public boolean hasDataToWrite() {
-        return hasMoreDataToFlush() || hasBufferedData();
-    }
-
-
-    protected boolean hasBufferedData() {
-        boolean result = false;
-        if (bufferedWrites!=null) {
-            Iterator<ByteBufferHolder> iter = bufferedWrites.iterator();
-            while (!result && iter.hasNext()) {
-                result = iter.next().hasData();
-            }
-        }
-        return result;
+        return socketWrapper.hasDataToWrite();
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java Thu Jan  8 13:09:54 2015
@@ -417,7 +417,6 @@ public class InternalNio2OutputBuffer ex
         }
     }
 
-    @Override
     protected boolean hasBufferedData() {
         return bufferedWrites.size() > 0;
     }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu Jan  8 13:09:54 2015
@@ -18,16 +18,11 @@
 package org.apache.coyote.http11;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
 
 import org.apache.coyote.Response;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.net.NioChannel;
-import org.apache.tomcat.util.net.NioEndpoint;
-import org.apache.tomcat.util.net.NioSelectorPool;
+import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
 import org.apache.tomcat.util.net.SocketWrapperBase;
 
 /**
@@ -47,25 +42,12 @@ public class InternalNioOutputBuffer ext
     }
 
 
-    /**
-     * Underlying socket.
-     */
-    private NioChannel socket;
-
-    /**
-     * Selector pool, for blocking reads and blocking writes
-     */
-    private NioSelectorPool pool;
-
-
     // --------------------------------------------------------- Public Methods
 
     @Override
     public void init(SocketWrapperBase<NioChannel> socketWrapper) {
         super.init(socketWrapper);
-        socket = socketWrapper.getSocket();
-        pool = ((NioEndpoint)socketWrapper.getEndpoint()).getSelectorPool();
-        socketWriteBuffer = socket.getBufHandler().getWriteBuffer();
+        socketWriteBuffer = socketWrapper.getSocket().getBufHandler().getWriteBuffer();
     }
 
 
@@ -77,7 +59,7 @@ public class InternalNioOutputBuffer ext
     public void recycle() {
         super.recycle();
         socketWriteBuffer.clear();
-        socket = null;
+        socketWrapper = null;
     }
 
 
@@ -89,103 +71,19 @@ public class InternalNioOutputBuffer ext
     @Override
     public void sendAck() throws IOException {
         if (!committed) {
-            socketWriteBuffer.put(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
-            int result = writeToSocket(socketWriteBuffer, true, true);
-            if (result < 0) {
+            addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
+            if (flushBuffer(true)) {
                 throw new IOException(sm.getString("iob.failedwrite.ack"));
             }
         }
     }
 
-    /**
-     *
-     * @param bytebuffer ByteBuffer
-     * @param flip boolean
-     * @return int
-     * @throws IOException
-     */
-    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
-        if ( flip ) {
-            bytebuffer.flip();
-            writeBufferFlipped = true;
-        }
-
-        int written = 0;
-        NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
-        if ( att == null ) throw new IOException("Key must be cancelled");
-        long writeTimeout = att.getWriteTimeout();
-        Selector selector = null;
-        try {
-            selector = pool.get();
-        } catch ( IOException x ) {
-            //ignore
-        }
-        try {
-            written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
-            //make sure we are flushed
-            do {
-                if (socket.flush(true,selector,writeTimeout)) break;
-            }while ( true );
-        } finally {
-            if ( selector != null ) pool.put(selector);
-        }
-        if ( block || bytebuffer.remaining()==0) {
-            //blocking writes must empty the buffer
-            //and if remaining==0 then we did empty it
-            bytebuffer.clear();
-            writeBufferFlipped = false;
-        }
-        // If there is data left in the buffer the socket will be registered for
-        // write further up the stack. This is to ensure the socket is only
-        // registered for write once as both container and user code can trigger
-        // write registration.
-        return written;
-    }
-
 
     // ------------------------------------------------------ Protected Methods
 
     @Override
-    protected synchronized void addToBB(byte[] buf, int offset, int length)
-            throws IOException {
-
-        if (length == 0) return;
-
-        // Try to flush any data in the socket's write buffer first
-        boolean dataLeft = flushBuffer(isBlocking());
-
-        // Keep writing until all the data is written or a non-blocking write
-        // leaves data in the buffer
-        while (!dataLeft && length > 0) {
-            int thisTime = transfer(buf,offset,length,socketWriteBuffer);
-            length = length - thisTime;
-            offset = offset + thisTime;
-            int written = writeToSocket(socketWriteBuffer, isBlocking(), true);
-            if (written == 0) {
-                dataLeft = true;
-            } else {
-                dataLeft = flushBuffer(isBlocking());
-            }
-        }
-
-        // Prevent timeouts for just doing client writes
-        socketWrapper.access();
-
-        if (!isBlocking() && length > 0) {
-            // Remaining data must be buffered
-            addToBuffers(buf, offset, length);
-        }
-    }
-
-
-    private void addToBuffers(byte[] buf, int offset, int length) {
-        ByteBufferHolder holder = bufferedWrites.peekLast();
-        if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
-            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
-            holder = new ByteBufferHolder(buffer,false);
-            bufferedWrites.add(holder);
-        }
-        holder.getBuf().put(buf,offset,length);
+    protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
+        socketWrapper.write(isBlocking(), buf, offset, length);
     }
 
 
@@ -194,49 +92,12 @@ public class InternalNioOutputBuffer ext
      */
     @Override
     protected boolean flushBuffer(boolean block) throws IOException {
-
-        //prevent timeout for async,
-        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-        if (key != null) {
-            NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment();
-            attach.access();
-        }
-
-        boolean dataLeft = hasMoreDataToFlush();
-
-        //write to the socket, if there is anything to write
-        if (dataLeft) {
-            writeToSocket(socketWriteBuffer, block, !writeBufferFlipped);
-        }
-
-        dataLeft = hasMoreDataToFlush();
-
-        if (!dataLeft && bufferedWrites.size() > 0) {
-            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
-            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
-                ByteBufferHolder buffer = bufIter.next();
-                buffer.flip();
-                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
-                    transfer(buffer.getBuf(), socketWriteBuffer);
-                    if (buffer.getBuf().remaining() == 0) {
-                        bufIter.remove();
-                    }
-                    writeToSocket(socketWriteBuffer, block, true);
-                    //here we must break if we didn't finish the write
-                }
-            }
-        }
-
-        return hasMoreDataToFlush();
+        return socketWrapper.flush(block);
     }
 
 
     @Override
     protected void registerWriteInterest() throws IOException {
-        NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
-        if (att == null) {
-            throw new IOException("Key must be cancelled");
-        }
-        att.getPoller().add(socket, SelectionKey.OP_WRITE);
+        ((NioSocketWrapper) socketWrapper).getPoller().add(socketWrapper.getSocket(), SelectionKey.OP_WRITE);
     }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Thu Jan  8 13:09:54 2015
@@ -55,18 +55,10 @@ public class UpgradeServletOutputStream
 
     private volatile ClassLoader applicationLoader = null;
 
-    // Writes guarded by writeLock
-    private volatile byte[] buffer;
-    private volatile int bufferPos;
-    private volatile int bufferLimit;
-    private final int asyncWriteBufferSize;
-
 
     public UpgradeServletOutputStream(SocketWrapperBase<?> socketWrapper,
             int asyncWriteBufferSize) {
         this.socketWrapper = socketWrapper;
-        this.asyncWriteBufferSize = asyncWriteBufferSize;
-        buffer = new byte[asyncWriteBufferSize];
     }
 
 
@@ -80,7 +72,7 @@ public class UpgradeServletOutputStream
         // Make sure isReady() and onWritePossible() have a consistent view of
         // buffer and fireListener when determining if the listener should fire
         synchronized (fireListenerLock) {
-            boolean result = (bufferLimit == 0);
+            boolean result = !socketWrapper.hasDataToWrite();
             fireListener = !result;
             return result;
         }
@@ -139,7 +131,7 @@ public class UpgradeServletOutputStream
 
 
     private void preWriteChecks() {
-        if (bufferLimit != 0) {
+        if (socketWrapper.hasDataToWrite()) {
             throw new IllegalStateException(sm.getString("upgrade.sis.write.ise"));
         }
     }
@@ -153,34 +145,7 @@ public class UpgradeServletOutputStream
             // Simple case - blocking IO
             socketWrapper.write(true, b, off, len);
         } else {
-            // Non-blocking IO
-            // If the non-blocking read does not complete, doWrite() will add
-            // the socket back into the poller. The poller may trigger a new
-            // write event before this method has finished updating buffer. The
-            // writeLock sync makes sure that buffer is updated before the next
-            // write executes.
-            int written = socketWrapper.write(false, b, off, len);
-            if (written < len) {
-                if (b == buffer) {
-                    // This is a partial write of the existing buffer. Just
-                    // increment the current position
-                    bufferPos += written;
-                } else {
-                    // This is a new partial write
-                    int bytesLeft = len - written;
-                    if (bytesLeft > buffer.length) {
-                        buffer = new byte[bytesLeft];
-                    } else if (bytesLeft < asyncWriteBufferSize &&
-                            buffer.length > asyncWriteBufferSize) {
-                        buffer = new byte[asyncWriteBufferSize];
-                    }
-                    bufferPos = 0;
-                    bufferLimit = bytesLeft;
-                    System.arraycopy(b, off + written, buffer, bufferPos, bufferLimit);
-                }
-            } else {
-                bufferLimit = 0;
-            }
+            socketWrapper.write(false, b, off, len);
         }
     }
 
@@ -188,9 +153,7 @@ public class UpgradeServletOutputStream
     protected final void onWritePossible() throws IOException {
         try {
             synchronized (writeLock) {
-                if (bufferLimit > 0) {
-                    writeInternal(buffer, bufferPos, bufferLimit - bufferPos);
-                }
+                socketWrapper.flush(false);
             }
         } catch (Throwable t) {
             ExceptionUtils.handleThrowable(t);
@@ -207,7 +170,7 @@ public class UpgradeServletOutputStream
         // should fire
         boolean fire = false;
         synchronized (fireListenerLock) {
-            if (bufferLimit == 0 && fireListener) {
+            if (!socketWrapper.hasDataToWrite() && fireListener) {
                 fireListener = false;
                 fire = true;
             }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan  8 13:09:54 2015
@@ -2505,7 +2505,7 @@ public class AprEndpoint extends Abstrac
 
 
         @Override
-        public int write(boolean block, byte[] b, int off, int len) throws IOException {
+        public void write(boolean block, byte[] b, int off, int len) throws IOException {
 
             if (closed) {
                 throw new IOException(sm.getString("apr.closed", getSocket()));
@@ -2517,7 +2517,8 @@ public class AprEndpoint extends Abstrac
             readLock.lock();
             try {
                 if (getBlockingStatus() == block) {
-                    return doWriteInternal(b, off, len);
+                    doWriteInternal(b, off, len);
+                    return;
                 }
             } finally {
                 readLock.unlock();
@@ -2537,7 +2538,8 @@ public class AprEndpoint extends Abstrac
                 readLock.lock();
                 try {
                     writeLock.unlock();
-                    return doWriteInternal(b, off, len);
+                    doWriteInternal(b, off, len);
+                    return;
                 } finally {
                     readLock.unlock();
                 }
@@ -2610,5 +2612,12 @@ public class AprEndpoint extends Abstrac
             ((AprEndpoint) getEndpoint()).getPoller().add(
                     getSocket().longValue(), -1, read, write);
         }
+
+
+        @Override
+        public boolean flush(boolean block) throws IOException {
+            // TODO Auto-generated method stub
+            return false;
+        }
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan  8 13:09:54 2015
@@ -992,9 +992,8 @@ public class Nio2Endpoint extends Abstra
 
 
         @Override
-        public int write(boolean block, byte[] b, int off, int len) throws IOException {
+        public void write(boolean block, byte[] b, int off, int len) throws IOException {
             int leftToWrite = len;
-            int count = 0;
             int offset = off;
 
             while (leftToWrite > 0) {
@@ -1011,11 +1010,10 @@ public class Nio2Endpoint extends Abstra
                 if (writtenThisLoop < 0) {
                     throw new EOFException();
                 }
-                count += writtenThisLoop;
                 if (!block && writePending.availablePermits() == 0) {
                     // Prevent concurrent writes in non blocking mode,
                     // leftover data has to be buffered
-                    return count;
+                    return;
                 }
                 offset += writtenThisLoop;
                 leftToWrite -= writtenThisLoop;
@@ -1024,8 +1022,6 @@ public class Nio2Endpoint extends Abstra
                     break;
                 }
             }
-
-            return count;
         }
 
 
@@ -1070,6 +1066,12 @@ public class Nio2Endpoint extends Abstra
         public void regsiterForEvent(boolean read, boolean write) {
             // NO-OP. Appropriate handlers will already have been registered.
         }
+
+        @Override
+        public boolean flush(boolean block) throws IOException {
+            // TODO Auto-generated method stub
+            return false;
+        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan  8 13:09:54 2015
@@ -51,6 +51,7 @@ import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.collections.SynchronizedQueue;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -1297,7 +1298,6 @@ public class NioEndpoint extends Abstrac
     // ---------------------------------------------------- Key Attachment Class
     public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
 
-        private final int maxWrite;
         private final NioSelectorPool pool;
 
         private Poller poller = null;
@@ -1310,7 +1310,6 @@ public class NioEndpoint extends Abstrac
 
         public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
             super(channel, endpoint);
-            maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
             pool = endpoint.getSelectorPool();
         }
 
@@ -1341,6 +1340,8 @@ public class NioEndpoint extends Abstrac
             }
             writeLatch = null;
             setWriteTimeout(soTimeout);
+
+            socketWriteBuffer = channel.getBufHandler().getWriteBuffer();
         }
 
         public void reset() {
@@ -1509,72 +1510,128 @@ public class NioEndpoint extends Abstrac
 
 
         @Override
-        public int write(boolean block, byte[] b, int off, int len) throws IOException {
-            int leftToWrite = len;
-            int count = 0;
-            int offset = off;
+        public void write(boolean block, byte[] b, int off, int len) throws IOException {
+            // Always flush any data remaining in the buffers
+            boolean dataLeft = flush(block);
 
-            while (leftToWrite > 0) {
-                int writeThisLoop;
-                int writtenThisLoop;
+            if (len == 0 || b == null) {
+                return;
+            }
 
-                if (leftToWrite > maxWrite) {
-                    writeThisLoop = maxWrite;
+            ByteBuffer socketWriteBuffer = getSocket().getBufHandler().getWriteBuffer();
+
+            // Keep writing until all the data is written or a non-blocking write
+            // leaves data in the buffer
+            while (!dataLeft && len > 0) {
+                int thisTime = transfer(b, off, len, socketWriteBuffer);
+                len = len - thisTime;
+                off = off + thisTime;
+                int written = writeToSocket(socketWriteBuffer, block, true);
+                if (written == 0) {
+                    dataLeft = true;
                 } else {
-                    writeThisLoop = leftToWrite;
+                    dataLeft = flush(block);
                 }
+            }
 
-                writtenThisLoop = writeInternal(block, b, offset, writeThisLoop);
-                count += writtenThisLoop;
-                offset += writtenThisLoop;
-                leftToWrite -= writtenThisLoop;
+            // Prevent timeouts for just doing client writes
+            access();
 
-                if (writtenThisLoop < writeThisLoop) {
-                    break;
+            if (!block && len > 0) {
+                // Remaining data must be buffered
+                addToBuffers(b, off, len);
+            }
+        }
+
+
+        @Override
+        public boolean flush(boolean block) throws IOException {
+
+            //prevent timeout for async,
+            SelectionKey key = getSocket().getIOChannel().keyFor(getSocket().getPoller().getSelector());
+            if (key != null) {
+                NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment();
+                attach.access();
+            }
+
+            boolean dataLeft = hasMoreDataToFlush();
+
+            //write to the socket, if there is anything to write
+            if (dataLeft) {
+                writeToSocket(socketWriteBuffer, block, !writeBufferFlipped);
+            }
+
+            dataLeft = hasMoreDataToFlush();
+
+            if (!dataLeft && bufferedWrites.size() > 0) {
+                Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+                while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+                    ByteBufferHolder buffer = bufIter.next();
+                    buffer.flip();
+                    while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+                        transfer(buffer.getBuf(), socketWriteBuffer);
+                        if (buffer.getBuf().remaining() == 0) {
+                            bufIter.remove();
+                        }
+                        writeToSocket(socketWriteBuffer, block, true);
+                        //here we must break if we didn't finish the write
+                    }
                 }
             }
 
-            return count;
+            return hasMoreDataToFlush();
         }
 
 
-        private int writeInternal (boolean block, byte[] b, int off, int len)
-                throws IOException {
-
-            NioEndpoint.NioSocketWrapper att =
-                    (NioEndpoint.NioSocketWrapper) getSocket().getAttachment();
-            if (att == null) {
-                throw new IOException("Key must be cancelled");
+        private void addToBuffers(byte[] buf, int offset, int length) {
+            ByteBufferHolder holder = bufferedWrites.peekLast();
+            if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+                ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+                holder = new ByteBufferHolder(buffer,false);
+                bufferedWrites.add(holder);
             }
+            holder.getBuf().put(buf,offset,length);
+        }
 
-            ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer();
-            writeBuffer.clear();
-            writeBuffer.put(b, off, len);
-            writeBuffer.flip();
+
+        private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
+            if (flip) {
+                bytebuffer.flip();
+                writeBufferFlipped = true;
+            }
 
             int written = 0;
-            long writeTimeout = att.getWriteTimeout();
+            long writeTimeout = getWriteTimeout();
             Selector selector = null;
             try {
                 selector = pool.get();
-            } catch ( IOException x ) {
-                //ignore
+            } catch (IOException x) {
+                // Ignore
             }
             try {
-                written = pool.write(writeBuffer, getSocket(), selector,
-                        writeTimeout, block);
+                written = pool.write(bytebuffer, getSocket(), selector, writeTimeout, block);
+                // Make sure we are flushed
+                do {
+                    if (getSocket().flush(true, selector, writeTimeout)) break;
+                } while (true);
             } finally {
                 if (selector != null) {
                     pool.put(selector);
                 }
             }
-            if (written < len) {
-                getSocket().getPoller().add(getSocket(), SelectionKey.OP_WRITE);
-            }
+            if (block || bytebuffer.remaining() == 0) {
+                // Blocking writes must empty the buffer
+                // and if remaining==0 then we did empty it
+                bytebuffer.clear();
+                writeBufferFlipped = false;
+            }
+            // If there is data left in the buffer the socket will be registered for
+            // write further up the stack. This is to ensure the socket is only
+            // registered for write once as both container and user code can trigger
+            // write registration.
             return written;
         }
 
-
         @Override
         public void regsiterForEvent(boolean read, boolean write) {
             SelectionKey key = getSocket().getIOChannel().keyFor(

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan  8 13:09:54 2015
@@ -21,10 +21,13 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tomcat.util.buf.ByteBufferHolder;
+
 public abstract class SocketWrapperBase<E> {
 
     private volatile E socket;
@@ -67,6 +70,23 @@ public abstract class SocketWrapperBase<
      */
     private final Object writeThreadLock = new Object();
 
+    protected ByteBuffer socketWriteBuffer;
+    protected volatile boolean writeBufferFlipped;
+
+    /**
+     * For "non-blocking" writes use an external set of buffers. Although the
+     * API only allows one non-blocking write at a time, due to buffering and
+     * the possible need to write HTTP headers, there may be more than one write
+     * to the OutputBuffer.
+     */
+    protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
+            new LinkedBlockingDeque<>();
+
+    /**
+     * The max size of the buffered write buffer
+     */
+    protected int bufferedWriteSize = 64*1024; //64k default write buffer
+
     private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
 
     public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) {
@@ -157,6 +177,28 @@ public abstract class SocketWrapperBase<
         return blockingStatusWriteLock;
     }
     public Object getWriteThreadLock() { return writeThreadLock; }
+
+    protected boolean hasMoreDataToFlush() {
+        return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) ||
+        (!writeBufferFlipped && socketWriteBuffer.position() > 0);
+    }
+
+    protected boolean hasBufferedData() {
+        boolean result = false;
+        if (bufferedWrites!=null) {
+            Iterator<ByteBufferHolder> iter = bufferedWrites.iterator();
+            while (!result && iter.hasNext()) {
+                result = iter.next().hasData();
+            }
+        }
+        return result;
+    }
+
+    public boolean hasDataToWrite() {
+        return hasMoreDataToFlush() || hasBufferedData();
+    }
+
+
     public void addDispatch(DispatchType dispatchType) {
         synchronized (dispatches) {
             dispatches.add(dispatchType);
@@ -233,7 +275,52 @@ public abstract class SocketWrapperBase<
     public abstract void unRead(ByteBuffer input);
     public abstract void close() throws IOException;
 
-    public abstract int write(boolean block, byte[] b, int off, int len) throws IOException;
+    /**
+     * Writes the provided data to the socket, buffering any remaining data if
+     * used in non-blocking mode. If any data remains in the buffers from a
+     * previous write then that data will be written before this data. It is
+     * therefore unnecessary to call flush() before calling this method.
+     *
+     * @param block <code>true<code> if a blocking write should be used,
+     *                  otherwise a non-blocking write will be used
+     * @param b     The byte array containing the data to be written
+     * @param off   The offset within the byte array of the data to be written
+     * @param len   The length of the data to be written
+     *
+     * @throws IOException If an IO error occurs during the write
+     */
+    public abstract void write(boolean block, byte[] b, int off, int len) throws IOException;
+
+    /**
+     * Writes as much data as possible from any that remains in the buffers.
+     *
+     * @param block <code>true<code> if a blocking write should be used,
+     *                  otherwise a non-blocking write will be used
+     *
+     * @return <code>true</code> if data remains to be flushed after this method
+     *         completes, otherwise <code>false</code>. In blocking mode
+     *         therefore, the return value should always be <code>false</code>
+     *
+     * @throws IOException If an IO error occurs during the write
+     */
+    public abstract boolean flush(boolean block) throws IOException;
 
     public abstract void regsiterForEvent(boolean read, boolean write);
+
+
+    // --------------------------------------------------------- Utility methods
+
+    protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) {
+        int max = Math.min(length, to.remaining());
+        to.put(from, offset, max);
+        return max;
+    }
+
+    protected static void transfer(ByteBuffer from, ByteBuffer to) {
+        int max = Math.min(from.remaining(), to.remaining());
+        int fromLimit = from.limit();
+        from.limit(from.position() + max);
+        to.put(from);
+        from.limit(fromLimit);
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org