You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/09/06 00:22:40 UTC

svn commit: r1520443 - /tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java

Author: markt
Date: Thu Sep  5 22:22:40 2013
New Revision: 1520443

URL: http://svn.apache.org/r1520443
Log:
Implement Servlet 3.1 non-blocking writes for AJP. Writes are fully non-blocking, both between and within AJP messages.

Modified:
    tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1520443&r1=1520442&r2=1520443&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep  5 22:22:40 2013
@@ -20,9 +20,12 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.security.NoSuchProviderException;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.servlet.http.HttpUpgradeHandler;
@@ -164,6 +167,7 @@ public abstract class AbstractAjpProcess
      */
     private int responseMsgPos = -1;
 
+
     /**
      * Body message.
      */
@@ -177,6 +181,22 @@ public abstract class AbstractAjpProcess
 
 
     /**
+     * The max size of the buffered write buffer
+     */
+    private int bufferedWriteSize = 64*1024; //64k default write buffer
+
+
+    /**
+     * For "non-blocking" writes use an external set of buffers. Although the
+     * API only allows one non-blocking write at a time, due to buffering and
+     * the possible need to write HTTP headers, there may be more than one write
+     * to the OutputBuffer.
+     */
+    private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
+            new LinkedBlockingDeque<>();
+
+
+    /**
      * Error flag.
      */
     protected boolean error = false;
@@ -1470,6 +1490,94 @@ public abstract class AbstractAjpProcess
         }
     }
 
+    private void writeData(ByteChunk chunk) throws IOException {
+        // Prevent timeout
+        socketWrapper.access();
+
+        boolean blocking = (response.getWriteListener() == null);
+        if (!blocking) {
+            flushBufferedData();
+        }
+
+        int len = chunk.getLength();
+        int off = 0;
+
+        // Write this chunk
+        while (responseMsgPos == -1 && len > 0) {
+            int thisTime = len;
+            if (thisTime > outputMaxChunkSize) {
+                thisTime = outputMaxChunkSize;
+            }
+            responseMessage.reset();
+            responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+            responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
+            responseMessage.end();
+            writeResponseMessage(blocking);
+
+            len -= thisTime;
+            off += thisTime;
+        }
+
+        bytesWritten += off;
+
+        if (chunk.getLength() > 0) {
+            // Add this chunk to the buffer
+            addToBuffers(chunk.getBuffer(), off, len);
+        }
+    }
+
+
+    private void addToBuffers(byte[] buf, int offset, int length) {
+        ByteBufferHolder holder = bufferedWrites.peekLast();
+        if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) {
+            ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+            holder = new ByteBufferHolder(buffer, false);
+            bufferedWrites.add(holder);
+        }
+        holder.getBuf().put(buf, offset, length);
+    }
+
+
+    private void flushBufferedData() throws IOException {
+
+        if (responseMsgPos > -1) {
+            // Must be using non-blocking IO
+            // Partially written response message. Try and complete it.
+            writeResponseMessage(false);
+        }
+
+        while (responseMsgPos == -1 && bufferedWrites.size() > 0) {
+            // Try and write any remaining buffer data
+            Iterator<ByteBufferHolder> holders = bufferedWrites.iterator();
+            ByteBufferHolder holder = holders.next();
+            holder.flip();
+            ByteBuffer buffer = holder.getBuf();
+            int initialBufferSize = buffer.remaining();
+            while (responseMsgPos == -1 && buffer.remaining() > 0) {
+                transferToResponseMsg(buffer);
+                writeResponseMessage(false);
+            }
+            bytesWritten += (initialBufferSize - buffer.remaining());
+            if (buffer.remaining() == 0) {
+                holders.remove();
+            }
+        }
+    }
+
+
+    private void transferToResponseMsg(ByteBuffer buffer) {
+
+        int thisTime = buffer.remaining();
+        if (thisTime > outputMaxChunkSize) {
+            thisTime = outputMaxChunkSize;
+        }
+
+        responseMessage.reset();
+        responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+        buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime);
+        responseMessage.end();
+    }
+
 
     private void writeResponseMessage(boolean block) throws IOException {
         int len = responseMessage.getLen();
@@ -1551,25 +1659,7 @@ public abstract class AbstractAjpProcess
             }
 
             if (!swallowResponse) {
-                int len = chunk.getLength();
-                // 4 - hardcoded, byte[] marshaling overhead
-                int off = 0;
-                while (len > 0) {
-                    int thisTime = len;
-                    if (thisTime > outputMaxChunkSize) {
-                        thisTime = outputMaxChunkSize;
-                    }
-                    len -= thisTime;
-                    responseMessage.reset();
-                    responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
-                    responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
-                    responseMessage.end();
-                    writeResponseMessage(true);
-
-                    off += thisTime;
-                }
-
-                bytesWritten += chunk.getLength();
+                writeData(chunk);
             }
             return chunk.getLength();
         }
@@ -1579,4 +1669,47 @@ public abstract class AbstractAjpProcess
             return bytesWritten;
         }
     }
+
+
+    protected static class ByteBufferHolder {
+        private final ByteBuffer buf;
+        private final AtomicBoolean flipped;
+        public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
+           this.buf = buf;
+           this.flipped = new AtomicBoolean(flipped);
+        }
+        public ByteBuffer getBuf() {
+            return buf;
+        }
+        public boolean isFlipped() {
+            return flipped.get();
+        }
+
+        public boolean flip() {
+            if (flipped.compareAndSet(false, true)) {
+                buf.flip();
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        public boolean hasData() {
+            if (flipped.get()) {
+                return buf.remaining()>0;
+            } else {
+                return buf.position()>0;
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder(super.toString());
+            builder.append("[flipped=");
+            builder.append(isFlipped()?"true, remaining=" : "false, position=");
+            builder.append(isFlipped()? buf.remaining(): buf.position());
+            builder.append("]");
+            return builder.toString();
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org