You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/01/08 14:10:06 UTC

svn commit: r1650271 - in /tomcat/trunk/java/org/apache: coyote/http11/InternalAprOutputBuffer.java tomcat/util/net/AprEndpoint.java tomcat/util/net/Nio2Endpoint.java tomcat/util/net/NioEndpoint.java tomcat/util/net/SocketWrapperBase.java

Author: markt
Date: Thu Jan  8 13:10:05 2015
New Revision: 1650271

URL: http://svn.apache.org/r1650271
Log:
First (untested) pass at moving APR writes to SocketWrapper

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu Jan  8 13:10:05 2015
@@ -19,14 +19,9 @@ package org.apache.coyote.http11;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.coyote.Response;
 import org.apache.tomcat.jni.Socket;
-import org.apache.tomcat.jni.Status;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -76,6 +71,7 @@ public class InternalAprOutputBuffer ext
         this.endpoint = socketWrapper.getEndpoint();
 
         Socket.setsbb(this.socket, socketWriteBuffer);
+        socketWrapper.socketWriteBuffer = socketWriteBuffer;
     }
 
 
@@ -99,166 +95,25 @@ public class InternalAprOutputBuffer ext
     @Override
     public void sendAck() throws IOException {
         if (!committed) {
-            if (Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0)
+            addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
+            if (flushBuffer(true)) {
                 throw new IOException(sm.getString("iob.failedwrite.ack"));
-        }
-    }
-
-
-    // ------------------------------------------------------ Protected Methods
-
-    @Override
-    protected synchronized void addToBB(byte[] buf, int offset, int length)
-            throws IOException {
-
-        if (length == 0) return;
-
-        // If bbuf is currently being used for writes, add this data to the
-        // write buffer
-        if (writeBufferFlipped) {
-            addToBuffers(buf, offset, length);
-            return;
-        }
-
-        // Keep writing until all the data is written or a non-blocking write
-        // leaves data in the buffer
-        while (length > 0) {
-            int thisTime = length;
-            if (socketWriteBuffer.position() == socketWriteBuffer.capacity()) {
-                if (flushBuffer(isBlocking())) {
-                    break;
-                }
-            }
-            if (thisTime > socketWriteBuffer.capacity() - socketWriteBuffer.position()) {
-                thisTime = socketWriteBuffer.capacity() - socketWriteBuffer.position();
             }
-            socketWriteBuffer.put(buf, offset, thisTime);
-            length = length - thisTime;
-            offset = offset + thisTime;
-        }
-
-        if (!isBlocking() && length>0) {
-            // Buffer the remaining data
-            addToBuffers(buf, offset, length);
         }
     }
 
 
-    private void addToBuffers(byte[] buf, int offset, int length) {
-        ByteBufferHolder holder = bufferedWrites.peekLast();
-        if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
-            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
-            holder = new ByteBufferHolder(buffer,false);
-            bufferedWrites.add(holder);
-        }
-        holder.getBuf().put(buf,offset,length);
-    }
-
+    // ------------------------------------------------------ Protected Methods
 
     @Override
-    protected synchronized boolean flushBuffer(boolean block)
-            throws IOException {
-
-        if (hasMoreDataToFlush()) {
-            writeToSocket(block);
-        }
-
-        if (bufferedWrites.size() > 0) {
-            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
-            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
-                ByteBufferHolder buffer = bufIter.next();
-                buffer.flip();
-                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
-                    transfer(buffer.getBuf(), socketWriteBuffer);
-                    if (buffer.getBuf().remaining() == 0) {
-                        bufIter.remove();
-                    }
-                    writeToSocket(block);
-                    //here we must break if we didn't finish the write
-                }
-            }
-        }
-
-        return hasMoreDataToFlush();
+    protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
+        socketWrapper.write(isBlocking(), buf, offset, length);
     }
 
 
-    private synchronized void writeToSocket(boolean block) throws IOException {
-
-        Lock readLock = socketWrapper.getBlockingStatusReadLock();
-        WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock();
-
-        readLock.lock();
-        try {
-            if (socketWrapper.getBlockingStatus() == block) {
-                writeToSocket();
-                return;
-            }
-        } finally {
-            readLock.unlock();
-        }
-
-        writeLock.lock();
-        try {
-            // Set the current settings for this socket
-            socketWrapper.setBlockingStatus(block);
-            if (block) {
-                Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
-            } else {
-                Socket.timeoutSet(socket, 0);
-            }
-
-            // Downgrade the lock
-            readLock.lock();
-            try {
-                writeLock.unlock();
-                writeToSocket();
-            } finally {
-                readLock.unlock();
-            }
-        } finally {
-            // Should have been released above but may not have been on some
-            // exception paths
-            if (writeLock.isHeldByCurrentThread()) {
-                writeLock.unlock();
-            }
-        }
-    }
-
-    private synchronized void writeToSocket() throws IOException {
-        if (!writeBufferFlipped) {
-            writeBufferFlipped = true;
-            socketWriteBuffer.flip();
-        }
-
-        int written;
-
-        do {
-            written = Socket.sendbb(socket, socketWriteBuffer.position(), socketWriteBuffer.remaining());
-            if (Status.APR_STATUS_IS_EAGAIN(-written)) {
-                written = 0;
-            } else if (written < 0) {
-                throw new IOException("APR error: " + written);
-            }
-            socketWriteBuffer.position(socketWriteBuffer.position() + written);
-        } while (written > 0 && socketWriteBuffer.hasRemaining());
-
-        if (socketWriteBuffer.remaining() == 0) {
-            socketWriteBuffer.clear();
-            writeBufferFlipped = false;
-        }
-        // If there is data left in the buffer the socket will be registered for
-        // write further up the stack. This is to ensure the socket is only
-        // registered for write once as both container and user code can trigger
-        // write registration.
-    }
-
-
-    //-------------------------------------------------- Non-blocking IO methods
-
     @Override
-    protected synchronized boolean hasMoreDataToFlush() {
-        return super.hasMoreDataToFlush();
+    protected boolean flushBuffer(boolean block) throws IOException {
+        return socketWrapper.flush(block);
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan  8 13:10:05 2015
@@ -2505,11 +2505,8 @@ public class AprEndpoint extends Abstrac
 
 
         @Override
-        public void write(boolean block, byte[] b, int off, int len) throws IOException {
-            doWrite(block, b, off, len);
-        }
-
-        private void doWrite(boolean block, byte[] b, int off, int len) throws IOException {
+        protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
+                throws IOException {
             if (closed) {
                 throw new IOException(sm.getString("apr.closed", getSocket()));
             }
@@ -2520,8 +2517,7 @@ public class AprEndpoint extends Abstrac
             readLock.lock();
             try {
                 if (getBlockingStatus() == block) {
-                    doWriteInternal(b, off, len);
-                    return;
+                    return doWriteInternal(bytebuffer, flip);
                 }
             } finally {
                 readLock.unlock();
@@ -2541,8 +2537,7 @@ public class AprEndpoint extends Abstrac
                 readLock.lock();
                 try {
                     writeLock.unlock();
-                    doWriteInternal(b, off, len);
-                    return;
+                    return doWriteInternal(bytebuffer, flip);
                 } finally {
                     readLock.unlock();
                 }
@@ -2556,57 +2551,66 @@ public class AprEndpoint extends Abstrac
         }
 
 
-        private int doWriteInternal(byte[] b, int off, int len) throws IOException {
+        private int doWriteInternal(ByteBuffer bytebuffer, boolean flip)
+                throws IOException {
+            if (flip) {
+                bytebuffer.flip();
+                writeBufferFlipped = true;
+            }
 
-            int start = off;
-            int left = len;
-            int written;
+            int written = 0;
+            int thisTime;
 
             do {
+                thisTime = 0;
                 if (getEndpoint().isSSLEnabled()) {
                     if (sslOutputBuffer.remaining() == 0) {
                         // Buffer was fully written last time around
                         sslOutputBuffer.clear();
-                        if (left < SSL_OUTPUT_BUFFER_SIZE) {
-                            sslOutputBuffer.put(b, start, left);
-                        } else {
-                            sslOutputBuffer.put(b, start, SSL_OUTPUT_BUFFER_SIZE);
-                        }
+                        transfer(bytebuffer, sslOutputBuffer);
                         sslOutputBuffer.flip();
+                        thisTime = sslOutputBuffer.remaining();
                     } else {
                         // Buffer still has data from previous attempt to write
                         // APR + SSL requires that exactly the same parameters are
                         // passed when re-attempting the write
                     }
-                    written = Socket.sendb(getSocket().longValue(), sslOutputBuffer,
+                    int sslWritten = Socket.sendb(getSocket().longValue(), sslOutputBuffer,
                             sslOutputBuffer.position(), sslOutputBuffer.limit());
-                    if (written > 0) {
+                    if (sslWritten > 0) {
                         sslOutputBuffer.position(
-                                sslOutputBuffer.position() + written);
+                                sslOutputBuffer.position() + sslWritten);
                     }
                 } else {
-                    written = Socket.send(getSocket().longValue(), b, start, left);
+                    thisTime = Socket.sendb(getSocket().longValue(), bytebuffer,
+                            bytebuffer.position(), bytebuffer.limit() - bytebuffer.position());
                 }
-                if (Status.APR_STATUS_IS_EAGAIN(-written)) {
-                    written = 0;
-                } else if (-written == Status.APR_EOF) {
-                    throw new EOFException(sm.getString("apr.clientAbort"));
+                if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) {
+                    thisTime = 0;
+                } else if (-thisTime == Status.APR_EOF) {
+                    throw new EOFException(sm.getString("socket.apr.clientAbort"));
                 } else if ((OS.IS_WIN32 || OS.IS_WIN64) &&
-                        (-written == Status.APR_OS_START_SYSERR + 10053)) {
+                        (-thisTime == Status.APR_OS_START_SYSERR + 10053)) {
                     // 10053 on Windows is connection aborted
-                    throw new EOFException(sm.getString("apr.clientAbort"));
-                } else if (written < 0) {
-                    throw new IOException(sm.getString("apr.write.error",
-                            Integer.valueOf(-written), getSocket(), this));
-                }
-                start += written;
-                left -= written;
-            } while (written > 0 && left > 0);
-
-            if (left > 0) {
-                ((AprEndpoint) getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true);
+                    throw new EOFException(sm.getString("socket.apr.clientAbort"));
+                } else if (thisTime < 0) {
+                    throw new IOException(sm.getString("socket.apr.write.error",
+                            Integer.valueOf(-thisTime), getSocket(), this));
+                }
+                written += thisTime;
+                bytebuffer.position(bytebuffer.position() + thisTime);
+            } while (thisTime > 0 && bytebuffer.hasRemaining());
+
+            if (bytebuffer.remaining() == 0) {
+                bytebuffer.clear();
+                writeBufferFlipped = false;
             }
-            return len - left;
+            // If there is data left in the buffer the socket will be registered for
+            // write further up the stack. This is to ensure the socket is only
+            // registered for write once as both container and user code can trigger
+            // write registration.
+
+            return written;
         }
 
 
@@ -2615,12 +2619,5 @@ public class AprEndpoint extends Abstrac
             ((AprEndpoint) getEndpoint()).getPoller().add(
                     getSocket().longValue(), -1, read, write);
         }
-
-
-        @Override
-        public boolean flush(boolean block) throws IOException {
-            // TODO Auto-generated method stub
-            return false;
-        }
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan  8 13:10:05 2015
@@ -1025,6 +1025,13 @@ public class Nio2Endpoint extends Abstra
         }
 
 
+        @Override
+        protected int doWrite(ByteBuffer buffer, boolean block, boolean flip)
+                throws IOException {
+            // TODO Auto-generated method stub
+            return 0;
+        }
+
         private int writeInternal(boolean block, byte[] b, int off, int len)
                 throws IOException {
             ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer();

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan  8 13:10:05 2015
@@ -51,7 +51,6 @@ import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.collections.SynchronizedQueue;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -1510,91 +1509,8 @@ public class NioEndpoint extends Abstrac
 
 
         @Override
-        public void write(boolean block, byte[] b, int off, int len) throws IOException {
-            // Always flush any data remaining in the buffers
-            boolean dataLeft = flush(block);
-
-            if (len == 0 || b == null) {
-                return;
-            }
-
-            ByteBuffer socketWriteBuffer = getSocket().getBufHandler().getWriteBuffer();
-
-            // Keep writing until all the data is written or a non-blocking write
-            // leaves data in the buffer
-            while (!dataLeft && len > 0) {
-                int thisTime = transfer(b, off, len, socketWriteBuffer);
-                len = len - thisTime;
-                off = off + thisTime;
-                int written = doWrite(socketWriteBuffer, block, true);
-                if (written == 0) {
-                    dataLeft = true;
-                } else {
-                    dataLeft = flush(block);
-                }
-            }
-
-            // Prevent timeouts for just doing client writes
-            access();
-
-            if (!block && len > 0) {
-                // Remaining data must be buffered
-                addToBuffers(b, off, len);
-            }
-        }
-
-
-        @Override
-        public boolean flush(boolean block) throws IOException {
-
-            //prevent timeout for async,
-            SelectionKey key = getSocket().getIOChannel().keyFor(getSocket().getPoller().getSelector());
-            if (key != null) {
-                NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment();
-                attach.access();
-            }
-
-            boolean dataLeft = hasMoreDataToFlush();
-
-            //write to the socket, if there is anything to write
-            if (dataLeft) {
-                doWrite(socketWriteBuffer, block, !writeBufferFlipped);
-            }
-
-            dataLeft = hasMoreDataToFlush();
-
-            if (!dataLeft && bufferedWrites.size() > 0) {
-                Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
-                while (!hasMoreDataToFlush() && bufIter.hasNext()) {
-                    ByteBufferHolder buffer = bufIter.next();
-                    buffer.flip();
-                    while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
-                        transfer(buffer.getBuf(), socketWriteBuffer);
-                        if (buffer.getBuf().remaining() == 0) {
-                            bufIter.remove();
-                        }
-                        doWrite(socketWriteBuffer, block, true);
-                        //here we must break if we didn't finish the write
-                    }
-                }
-            }
-
-            return hasMoreDataToFlush();
-        }
-
-
-        private void addToBuffers(byte[] buf, int offset, int length) {
-            ByteBufferHolder holder = bufferedWrites.peekLast();
-            if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
-                ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
-                holder = new ByteBufferHolder(buffer,false);
-                bufferedWrites.add(holder);
-            }
-            holder.getBuf().put(buf,offset,length);
-        }
-
-
-        private synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
+        protected synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
+                throws IOException {
             if (flip) {
                 bytebuffer.flip();
                 writeBufferFlipped = true;
@@ -1619,9 +1535,7 @@ public class NioEndpoint extends Abstrac
                     pool.put(selector);
                 }
             }
-            if (block || bytebuffer.remaining() == 0) {
-                // Blocking writes must empty the buffer
-                // and if remaining==0 then we did empty it
+            if (bytebuffer.remaining() == 0) {
                 bytebuffer.clear();
                 writeBufferFlipped = false;
             }
@@ -1629,6 +1543,7 @@ public class NioEndpoint extends Abstrac
             // write further up the stack. This is to ensure the socket is only
             // registered for write once as both container and user code can trigger
             // write registration.
+
             return written;
         }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan  8 13:10:05 2015
@@ -70,7 +70,8 @@ public abstract class SocketWrapperBase<
      */
     private final Object writeThreadLock = new Object();
 
-    protected ByteBuffer socketWriteBuffer;
+    // TODO This being public is a temporary hack to simplify refactoring
+    public volatile ByteBuffer socketWriteBuffer;
     protected volatile boolean writeBufferFlipped;
 
     /**
@@ -289,7 +290,38 @@ public abstract class SocketWrapperBase<
      *
      * @throws IOException If an IO error occurs during the write
      */
-    public abstract void write(boolean block, byte[] b, int off, int len) throws IOException;
+    public void write(boolean block, byte[] b, int off, int len) throws IOException {
+        // Always flush any data remaining in the buffers
+        boolean dataLeft = flush(block);
+
+        if (len == 0 || b == null) {
+            return;
+        }
+
+        // Keep writing until all the data is written or a non-blocking write
+        // leaves data in the buffer
+        while (!dataLeft && len > 0) {
+            int thisTime = transfer(b, off, len, socketWriteBuffer);
+            len = len - thisTime;
+            off = off + thisTime;
+            int written = doWrite(socketWriteBuffer, block, true);
+            if (written == 0) {
+                dataLeft = true;
+            } else {
+                dataLeft = flush(block);
+            }
+        }
+
+        // Prevent timeouts for just doing client writes
+        access();
+
+        if (!block && len > 0) {
+            // Remaining data must be buffered
+            addToBuffers(b, off, len);
+        }
+    }
+
+
 
     /**
      * Writes as much data as possible from any that remains in the buffers.
@@ -303,7 +335,54 @@ public abstract class SocketWrapperBase<
      *
      * @throws IOException If an IO error occurs during the write
      */
-    public abstract boolean flush(boolean block) throws IOException;
+    public boolean flush(boolean block) throws IOException {
+
+        // Prevent timeout for async
+        access();
+
+        boolean dataLeft = hasMoreDataToFlush();
+
+        // Write to the socket, if there is anything to write
+        if (dataLeft) {
+            doWrite(socketWriteBuffer, block, !writeBufferFlipped);
+        }
+
+        dataLeft = hasMoreDataToFlush();
+
+        if (!dataLeft && bufferedWrites.size() > 0) {
+            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+                ByteBufferHolder buffer = bufIter.next();
+                buffer.flip();
+                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+                    transfer(buffer.getBuf(), socketWriteBuffer);
+                    if (buffer.getBuf().remaining() == 0) {
+                        bufIter.remove();
+                    }
+                    doWrite(socketWriteBuffer, block, true);
+                    //here we must break if we didn't finish the write
+                }
+            }
+        }
+
+        return hasMoreDataToFlush();
+    }
+
+
+    protected abstract int doWrite(ByteBuffer buffer, boolean block, boolean flip)
+            throws IOException;
+
+
+    protected void addToBuffers(byte[] buf, int offset, int length) {
+        ByteBufferHolder holder = bufferedWrites.peekLast();
+        if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+            holder = new ByteBufferHolder(buffer,false);
+            bufferedWrites.add(holder);
+        }
+        holder.getBuf().put(buf,offset,length);
+    }
+
 
     public abstract void regsiterForEvent(boolean read, boolean write);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org