You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/09/06 00:22:40 UTC
svn commit: r1520443 -
/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
Author: markt
Date: Thu Sep 5 22:22:40 2013
New Revision: 1520443
URL: http://svn.apache.org/r1520443
Log:
Implement Servlet 3.1 non-blocking writes for AJP. Writes are fully non-blocking, both between and within AJP messages.
Modified:
tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1520443&r1=1520442&r2=1520443&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep 5 22:22:40 2013
@@ -20,9 +20,12 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
import java.security.NoSuchProviderException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.http.HttpUpgradeHandler;
@@ -164,6 +167,7 @@ public abstract class AbstractAjpProcess
*/
private int responseMsgPos = -1;
+
/**
* Body message.
*/
@@ -177,6 +181,22 @@ public abstract class AbstractAjpProcess
/**
+ * The max size of the buffered write buffer
+ */
+ private int bufferedWriteSize = 64*1024; //64k default write buffer
+
+
+ /**
+ * For "non-blocking" writes use an external set of buffers. Although the
+ * API only allows one non-blocking write at a time, due to buffering and
+ * the possible need to write HTTP headers, there may be more than one write
+ * to the OutputBuffer.
+ */
+ private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
+ new LinkedBlockingDeque<>();
+
+
+ /**
* Error flag.
*/
protected boolean error = false;
@@ -1470,6 +1490,94 @@ public abstract class AbstractAjpProcess
}
}
+ private void writeData(ByteChunk chunk) throws IOException {
+ // Prevent timeout
+ socketWrapper.access();
+
+ boolean blocking = (response.getWriteListener() == null);
+ if (!blocking) {
+ flushBufferedData();
+ }
+
+ int len = chunk.getLength();
+ int off = 0;
+
+ // Write this chunk
+ while (responseMsgPos == -1 && len > 0) {
+ int thisTime = len;
+ if (thisTime > outputMaxChunkSize) {
+ thisTime = outputMaxChunkSize;
+ }
+ responseMessage.reset();
+ responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+ responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
+ responseMessage.end();
+ writeResponseMessage(blocking);
+
+ len -= thisTime;
+ off += thisTime;
+ }
+
+ bytesWritten += off;
+
+ if (chunk.getLength() > 0) {
+ // Add this chunk to the buffer
+ addToBuffers(chunk.getBuffer(), off, len);
+ }
+ }
+
+
+ private void addToBuffers(byte[] buf, int offset, int length) {
+ ByteBufferHolder holder = bufferedWrites.peekLast();
+ if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) {
+ ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+ holder = new ByteBufferHolder(buffer, false);
+ bufferedWrites.add(holder);
+ }
+ holder.getBuf().put(buf, offset, length);
+ }
+
+
+ private void flushBufferedData() throws IOException {
+
+ if (responseMsgPos > -1) {
+ // Must be using non-blocking IO
+ // Partially written response message. Try and complete it.
+ writeResponseMessage(false);
+ }
+
+ while (responseMsgPos == -1 && bufferedWrites.size() > 0) {
+ // Try and write any remaining buffer data
+ Iterator<ByteBufferHolder> holders = bufferedWrites.iterator();
+ ByteBufferHolder holder = holders.next();
+ holder.flip();
+ ByteBuffer buffer = holder.getBuf();
+ int initialBufferSize = buffer.remaining();
+ while (responseMsgPos == -1 && buffer.remaining() > 0) {
+ transferToResponseMsg(buffer);
+ writeResponseMessage(false);
+ }
+ bytesWritten += (initialBufferSize - buffer.remaining());
+ if (buffer.remaining() == 0) {
+ holders.remove();
+ }
+ }
+ }
+
+
+ private void transferToResponseMsg(ByteBuffer buffer) {
+
+ int thisTime = buffer.remaining();
+ if (thisTime > outputMaxChunkSize) {
+ thisTime = outputMaxChunkSize;
+ }
+
+ responseMessage.reset();
+ responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+ buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime);
+ responseMessage.end();
+ }
+
private void writeResponseMessage(boolean block) throws IOException {
int len = responseMessage.getLen();
@@ -1551,25 +1659,7 @@ public abstract class AbstractAjpProcess
}
if (!swallowResponse) {
- int len = chunk.getLength();
- // 4 - hardcoded, byte[] marshaling overhead
- int off = 0;
- while (len > 0) {
- int thisTime = len;
- if (thisTime > outputMaxChunkSize) {
- thisTime = outputMaxChunkSize;
- }
- len -= thisTime;
- responseMessage.reset();
- responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
- responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
- responseMessage.end();
- writeResponseMessage(true);
-
- off += thisTime;
- }
-
- bytesWritten += chunk.getLength();
+ writeData(chunk);
}
return chunk.getLength();
}
@@ -1579,4 +1669,47 @@ public abstract class AbstractAjpProcess
return bytesWritten;
}
}
+
+
+ protected static class ByteBufferHolder {
+ private final ByteBuffer buf;
+ private final AtomicBoolean flipped;
+ public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
+ this.buf = buf;
+ this.flipped = new AtomicBoolean(flipped);
+ }
+ public ByteBuffer getBuf() {
+ return buf;
+ }
+ public boolean isFlipped() {
+ return flipped.get();
+ }
+
+ public boolean flip() {
+ if (flipped.compareAndSet(false, true)) {
+ buf.flip();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean hasData() {
+ if (flipped.get()) {
+ return buf.remaining()>0;
+ } else {
+ return buf.position()>0;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder(super.toString());
+ builder.append("[flipped=");
+ builder.append(isFlipped()?"true, remaining=" : "false, position=");
+ builder.append(isFlipped()? buf.remaining(): buf.position());
+ builder.append("]");
+ return builder.toString();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org