You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/05/02 22:59:05 UTC

svn commit: r1478542 - in /tomcat/trunk/java/org/apache/coyote/http11: AbstractOutputBuffer.java InternalAprOutputBuffer.java InternalNioOutputBuffer.java InternalOutputBuffer.java

Author: markt
Date: Thu May  2 20:59:05 2013
New Revision: 1478542

URL: http://svn.apache.org/r1478542
Log:
Copy buffering for non-blocking writes from NIO to APR and align code

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu May  2 20:59:05 2013
@@ -62,7 +62,7 @@ public abstract class AbstractOutputBuff
     /**
      * The buffer used for header composition.
      */
-    protected byte[] buf;
+    protected byte[] headerBuffer;
 
 
     /**
@@ -380,7 +380,7 @@ public abstract class AbstractOutputBuff
 
         // Write protocol name
         write(Constants.HTTP_11_BYTES);
-        buf[pos++] = Constants.SP;
+        headerBuffer[pos++] = Constants.SP;
 
         // Write status code
         int status = response.getStatus();
@@ -398,7 +398,7 @@ public abstract class AbstractOutputBuff
             write(status);
         }
 
-        buf[pos++] = Constants.SP;
+        headerBuffer[pos++] = Constants.SP;
 
         // Write message
         String message = null;
@@ -418,15 +418,15 @@ public abstract class AbstractOutputBuff
                 new PrivilegedAction<Void>(){
                     @Override
                     public Void run(){
-                        buf[pos++] = Constants.CR;
-                        buf[pos++] = Constants.LF;
+                        headerBuffer[pos++] = Constants.CR;
+                        headerBuffer[pos++] = Constants.LF;
                         return null;
                     }
                 }
            );
         } else {
-            buf[pos++] = Constants.CR;
-            buf[pos++] = Constants.LF;
+            headerBuffer[pos++] = Constants.CR;
+            headerBuffer[pos++] = Constants.LF;
         }
 
     }
@@ -441,11 +441,11 @@ public abstract class AbstractOutputBuff
     public void sendHeader(MessageBytes name, MessageBytes value) {
 
         write(name);
-        buf[pos++] = Constants.COLON;
-        buf[pos++] = Constants.SP;
+        headerBuffer[pos++] = Constants.COLON;
+        headerBuffer[pos++] = Constants.SP;
         write(value);
-        buf[pos++] = Constants.CR;
-        buf[pos++] = Constants.LF;
+        headerBuffer[pos++] = Constants.CR;
+        headerBuffer[pos++] = Constants.LF;
 
     }
 
@@ -455,8 +455,8 @@ public abstract class AbstractOutputBuff
      */
     public void endHeaders() {
 
-        buf[pos++] = Constants.CR;
-        buf[pos++] = Constants.LF;
+        headerBuffer[pos++] = Constants.CR;
+        headerBuffer[pos++] = Constants.LF;
 
     }
 
@@ -495,7 +495,7 @@ public abstract class AbstractOutputBuff
         // Writing the byte chunk to the output buffer
         int length = bc.getLength();
         checkLengthBeforeWrite(length);
-        System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length);
+        System.arraycopy(bc.getBytes(), bc.getStart(), headerBuffer, pos, length);
         pos = pos + length;
 
     }
@@ -523,7 +523,7 @@ public abstract class AbstractOutputBuff
             if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
                 c = ' ';
             }
-            buf[pos++] = (byte) c;
+            headerBuffer[pos++] = (byte) c;
         }
 
     }
@@ -540,7 +540,7 @@ public abstract class AbstractOutputBuff
         checkLengthBeforeWrite(b.length);
 
         // Writing the byte chunk to the output buffer
-        System.arraycopy(b, 0, buf, pos, b.length);
+        System.arraycopy(b, 0, headerBuffer, pos, b.length);
         pos = pos + b.length;
 
     }
@@ -570,7 +570,7 @@ public abstract class AbstractOutputBuff
             if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
                 c = ' ';
             }
-            buf[pos++] = (byte) c;
+            headerBuffer[pos++] = (byte) c;
         }
 
     }
@@ -595,7 +595,7 @@ public abstract class AbstractOutputBuff
      * requested number of bytes.
      */
     private void checkLengthBeforeWrite(int length) {
-        if (pos + length > buf.length) {
+        if (pos + length > headerBuffer.length) {
             throw new HeadersTooLargeException(
                     sm.getString("iob.responseheadertoolarge.error"));
         }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu May  2 20:59:05 2013
@@ -19,6 +19,7 @@ package org.apache.coyote.http11;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Response;
@@ -35,7 +36,6 @@ import org.apache.tomcat.util.net.Socket
  */
 public class InternalAprOutputBuffer extends AbstractOutputBuffer<Long> {
 
-
     // ----------------------------------------------------------- Constructors
 
     /**
@@ -45,7 +45,7 @@ public class InternalAprOutputBuffer ext
 
         this.response = response;
 
-        buf = new byte[headerBufferSize];
+        headerBuffer = new byte[headerBufferSize];
         if (headerBufferSize < (8 * 1024)) {
             bbuf = ByteBuffer.allocateDirect(6 * 1500);
         } else {
@@ -141,27 +141,108 @@ public class InternalAprOutputBuffer ext
 
         if (pos > 0) {
             // Sending the response header buffer
-            bbuf.put(buf, 0, pos);
+            bbuf.put(headerBuffer, 0, pos);
+        }
+
+    }
+
+
+    private synchronized void addToBB(byte[] buf, int offset, int length)
+            throws IOException {
+
+        if (length == 0) return;
+
+        // Try to flush any data in the socket's write buffer first
+        boolean dataLeft = flushBuffer(isBlocking());
+
+        // Keep writing until all the data is written or a non-blocking write
+        // leaves data in the buffer
+        while (!dataLeft && length > 0) {
+            int thisTime = length;
+            if (bbuf.position() == bbuf.capacity()) {
+                flushBuffer(isBlocking());
+            }
+            if (thisTime > bbuf.capacity() - bbuf.position()) {
+                thisTime = bbuf.capacity() - bbuf.position();
+            }
+            bbuf.put(buf, offset, thisTime);
+            length = length - thisTime;
+            offset = offset + thisTime;
+        }
+
+        // TODO: Review how to update the SocketWrapper's last accessed time
+
+        if (!isBlocking() && length>0) {
+            // Buffer the remaining data
+            addToBuffers(buf, offset, length);
         }
 
     }
 
 
+    private void addToBuffers(byte[] buf, int offset, int length) {
+        ByteBufferHolder holder = bufferedWrites.peekLast();
+        if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+            holder = new ByteBufferHolder(buffer,false);
+            bufferedWrites.add(holder);
+        }
+        holder.getBuf().put(buf,offset,length);
+    }
+
+
     /**
      * Callback to write data from the buffer.
      */
     @Override
     protected boolean flushBuffer(boolean block) throws IOException {
-        // TODO: Non-blocking IO not yet implemented so always block parameter
-        //       ignored
-        if (bbuf.position() > 0) {
-            if (Socket.sendbb(socket, 0, bbuf.position()) < 0) {
-                throw new IOException();
+
+        // TODO: Review how to update the SocketWrapper's last accessed time
+
+        boolean dataLeft = hasMoreDataToFlush();
+
+        if (dataLeft) {
+            writeToSocket();
+        }
+
+        if (!dataLeft && bufferedWrites!=null) {
+            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+                ByteBufferHolder buffer = bufIter.next();
+                buffer.flip();
+                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+                    transfer(buffer.getBuf(), bbuf);
+                    if (buffer.getBuf().remaining() == 0) {
+                        bufIter.remove();
+                    }
+                    writeToSocket();
+                    //here we must break if we didn't finish the write
+                }
             }
-            bbuf.clear();
         }
-        // TODO: Non-blocking IO not yet implemented so always returns false
-        return false;
+
+        dataLeft = hasMoreDataToFlush();
+
+        return hasMoreDataToFlush();
+    }
+
+
+    private void writeToSocket() throws IOException {
+        // TODO Implement non-blocking writes
+        if (Socket.sendbb(socket, 0, bbuf.position()) < 0) {
+            throw new IOException();
+        }
+        bbuf.clear();
+
+    }
+
+
+    private void transfer(ByteBuffer from, ByteBuffer to) {
+        int max = Math.min(from.remaining(), to.remaining());
+        ByteBuffer tmp = from.duplicate ();
+        tmp.limit (tmp.position() + max);
+        to.put (tmp);
+        from.position(from.position() + max);
     }
 
 
@@ -169,8 +250,7 @@ public class InternalAprOutputBuffer ext
 
     @Override
     protected boolean hasMoreDataToFlush() {
-        // TODO
-        return false;
+        return bbuf.position() > 0;
     }
 
 
@@ -187,24 +267,12 @@ public class InternalAprOutputBuffer ext
          * Write chunk.
          */
         @Override
-        public int doWrite(ByteChunk chunk, Response res)
-            throws IOException {
+        public int doWrite(ByteChunk chunk, Response res) throws IOException {
 
             int len = chunk.getLength();
             int start = chunk.getStart();
             byte[] b = chunk.getBuffer();
-            while (len > 0) {
-                int thisTime = len;
-                if (bbuf.position() == bbuf.capacity()) {
-                    flushBuffer(isBlocking());
-                }
-                if (thisTime > bbuf.capacity() - bbuf.position()) {
-                    thisTime = bbuf.capacity() - bbuf.position();
-                }
-                bbuf.put(b, start, thisTime);
-                len = len - thisTime;
-                start = start + thisTime;
-            }
+            addToBB(b, start,len);
             byteCount += chunk.getLength();
             return chunk.getLength();
         }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu May  2 20:59:05 2013
@@ -50,7 +50,7 @@ public class InternalNioOutputBuffer ext
 
         this.response = response;
 
-        buf = new byte[headerBufferSize];
+        headerBuffer = new byte[headerBufferSize];
 
         outputStreamOutputBuffer = new SocketOutputBuffer();
 
@@ -185,18 +185,22 @@ public class InternalNioOutputBuffer ext
 
         if (pos > 0) {
             // Sending the response header buffer
-            addToBB(buf, 0, pos);
+            addToBB(headerBuffer, 0, pos);
         }
 
     }
 
 
-    private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
-        //try to write to socket first
-        if (length==0) return;
+    private synchronized void addToBB(byte[] buf, int offset, int length)
+            throws IOException {
+
+        if (length == 0) return;
 
+        // Try to flush any data in the socket's write buffer first
         boolean dataLeft = flushBuffer(isBlocking());
 
+        // Keep writing until all the data is written or a non-blocking write
+        // leaves data in the buffer
         while (!dataLeft && length>0) {
             int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
             length = length - thisTime;
@@ -215,6 +219,7 @@ public class InternalNioOutputBuffer ext
         }
     }
 
+
     private void addToBuffers(byte[] buf, int offset, int length) {
         ByteBufferHolder holder = bufferedWrites.peekLast();
         if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
@@ -264,9 +269,7 @@ public class InternalNioOutputBuffer ext
             }
         }
 
-        dataLeft = hasMoreDataToFlush();
-
-        return dataLeft;
+        return hasMoreDataToFlush();
     }
 
     @Override
@@ -281,13 +284,12 @@ public class InternalNioOutputBuffer ext
         return max;
     }
 
-    private int transfer(ByteBuffer from, ByteBuffer to) {
+    private void transfer(ByteBuffer from, ByteBuffer to) {
         int max = Math.min(from.remaining(), to.remaining());
         ByteBuffer tmp = from.duplicate ();
         tmp.limit (tmp.position() + max);
         to.put (tmp);
         from.position(from.position() + max);
-        return max;
     }
 
 
@@ -297,16 +299,14 @@ public class InternalNioOutputBuffer ext
      * This class is an output buffer which will write data to an output
      * stream.
      */
-    protected class SocketOutputBuffer
-        implements OutputBuffer {
+    protected class SocketOutputBuffer implements OutputBuffer {
 
 
         /**
          * Write chunk.
          */
         @Override
-        public int doWrite(ByteChunk chunk, Response res)
-            throws IOException {
+        public int doWrite(ByteChunk chunk, Response res) throws IOException {
 
             int len = chunk.getLength();
             int start = chunk.getStart();

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java Thu May  2 20:59:05 2013
@@ -44,7 +44,7 @@ public class InternalOutputBuffer extend
 
         this.response = response;
 
-        buf = new byte[headerBufferSize];
+        headerBuffer = new byte[headerBufferSize];
 
         outputStreamOutputBuffer = new OutputStreamOutputBuffer();
 
@@ -161,9 +161,9 @@ public class InternalOutputBuffer extend
         if (pos > 0) {
             // Sending the response header buffer
             if (useSocketBuffer) {
-                socketBuffer.append(buf, 0, pos);
+                socketBuffer.append(headerBuffer, 0, pos);
             } else {
-                outputStream.write(buf, 0, pos);
+                outputStream.write(headerBuffer, 0, pos);
             }
         }
 
@@ -186,7 +186,7 @@ public class InternalOutputBuffer extend
 
     @Override
     protected boolean hasMoreDataToFlush() {
-        // TODO
+        // Blocking IO so always returns false.
         return false;
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org