You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/05/02 22:59:05 UTC
svn commit: r1478542 - in /tomcat/trunk/java/org/apache/coyote/http11:
AbstractOutputBuffer.java InternalAprOutputBuffer.java
InternalNioOutputBuffer.java InternalOutputBuffer.java
Author: markt
Date: Thu May 2 20:59:05 2013
New Revision: 1478542
URL: http://svn.apache.org/r1478542
Log:
Copy buffering for non-blocking writes from NIO to APR and align code
Modified:
tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu May 2 20:59:05 2013
@@ -62,7 +62,7 @@ public abstract class AbstractOutputBuff
/**
* The buffer used for header composition.
*/
- protected byte[] buf;
+ protected byte[] headerBuffer;
/**
@@ -380,7 +380,7 @@ public abstract class AbstractOutputBuff
// Write protocol name
write(Constants.HTTP_11_BYTES);
- buf[pos++] = Constants.SP;
+ headerBuffer[pos++] = Constants.SP;
// Write status code
int status = response.getStatus();
@@ -398,7 +398,7 @@ public abstract class AbstractOutputBuff
write(status);
}
- buf[pos++] = Constants.SP;
+ headerBuffer[pos++] = Constants.SP;
// Write message
String message = null;
@@ -418,15 +418,15 @@ public abstract class AbstractOutputBuff
new PrivilegedAction<Void>(){
@Override
public Void run(){
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
+ headerBuffer[pos++] = Constants.CR;
+ headerBuffer[pos++] = Constants.LF;
return null;
}
}
);
} else {
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
+ headerBuffer[pos++] = Constants.CR;
+ headerBuffer[pos++] = Constants.LF;
}
}
@@ -441,11 +441,11 @@ public abstract class AbstractOutputBuff
public void sendHeader(MessageBytes name, MessageBytes value) {
write(name);
- buf[pos++] = Constants.COLON;
- buf[pos++] = Constants.SP;
+ headerBuffer[pos++] = Constants.COLON;
+ headerBuffer[pos++] = Constants.SP;
write(value);
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
+ headerBuffer[pos++] = Constants.CR;
+ headerBuffer[pos++] = Constants.LF;
}
@@ -455,8 +455,8 @@ public abstract class AbstractOutputBuff
*/
public void endHeaders() {
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
+ headerBuffer[pos++] = Constants.CR;
+ headerBuffer[pos++] = Constants.LF;
}
@@ -495,7 +495,7 @@ public abstract class AbstractOutputBuff
// Writing the byte chunk to the output buffer
int length = bc.getLength();
checkLengthBeforeWrite(length);
- System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length);
+ System.arraycopy(bc.getBytes(), bc.getStart(), headerBuffer, pos, length);
pos = pos + length;
}
@@ -523,7 +523,7 @@ public abstract class AbstractOutputBuff
if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
c = ' ';
}
- buf[pos++] = (byte) c;
+ headerBuffer[pos++] = (byte) c;
}
}
@@ -540,7 +540,7 @@ public abstract class AbstractOutputBuff
checkLengthBeforeWrite(b.length);
// Writing the byte chunk to the output buffer
- System.arraycopy(b, 0, buf, pos, b.length);
+ System.arraycopy(b, 0, headerBuffer, pos, b.length);
pos = pos + b.length;
}
@@ -570,7 +570,7 @@ public abstract class AbstractOutputBuff
if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
c = ' ';
}
- buf[pos++] = (byte) c;
+ headerBuffer[pos++] = (byte) c;
}
}
@@ -595,7 +595,7 @@ public abstract class AbstractOutputBuff
* requested number of bytes.
*/
private void checkLengthBeforeWrite(int length) {
- if (pos + length > buf.length) {
+ if (pos + length > headerBuffer.length) {
throw new HeadersTooLargeException(
sm.getString("iob.responseheadertoolarge.error"));
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu May 2 20:59:05 2013
@@ -19,6 +19,7 @@ package org.apache.coyote.http11;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
@@ -35,7 +36,6 @@ import org.apache.tomcat.util.net.Socket
*/
public class InternalAprOutputBuffer extends AbstractOutputBuffer<Long> {
-
// ----------------------------------------------------------- Constructors
/**
@@ -45,7 +45,7 @@ public class InternalAprOutputBuffer ext
this.response = response;
- buf = new byte[headerBufferSize];
+ headerBuffer = new byte[headerBufferSize];
if (headerBufferSize < (8 * 1024)) {
bbuf = ByteBuffer.allocateDirect(6 * 1500);
} else {
@@ -141,27 +141,108 @@ public class InternalAprOutputBuffer ext
if (pos > 0) {
// Sending the response header buffer
- bbuf.put(buf, 0, pos);
+ bbuf.put(headerBuffer, 0, pos);
+ }
+
+ }
+
+
+ private synchronized void addToBB(byte[] buf, int offset, int length)
+ throws IOException {
+
+ if (length == 0) return;
+
+ // Try to flush any data in the socket's write buffer first
+ boolean dataLeft = flushBuffer(isBlocking());
+
+ // Keep writing until all the data is written or a non-blocking write
+ // leaves data in the buffer
+ while (!dataLeft && length > 0) {
+ int thisTime = length;
+ if (bbuf.position() == bbuf.capacity()) {
+ flushBuffer(isBlocking());
+ }
+ if (thisTime > bbuf.capacity() - bbuf.position()) {
+ thisTime = bbuf.capacity() - bbuf.position();
+ }
+ bbuf.put(buf, offset, thisTime);
+ length = length - thisTime;
+ offset = offset + thisTime;
+ }
+
+ // TODO: Review how to update the SocketWrapper's last accessed time
+
+ if (!isBlocking() && length>0) {
+ // Buffer the remaining data
+ addToBuffers(buf, offset, length);
}
}
+ private void addToBuffers(byte[] buf, int offset, int length) {
+ ByteBufferHolder holder = bufferedWrites.peekLast();
+ if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+ ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+ holder = new ByteBufferHolder(buffer,false);
+ bufferedWrites.add(holder);
+ }
+ holder.getBuf().put(buf,offset,length);
+ }
+
+
/**
* Callback to write data from the buffer.
*/
@Override
protected boolean flushBuffer(boolean block) throws IOException {
- // TODO: Non-blocking IO not yet implemented so always block parameter
- // ignored
- if (bbuf.position() > 0) {
- if (Socket.sendbb(socket, 0, bbuf.position()) < 0) {
- throw new IOException();
+
+ // TODO: Review how to update the SocketWrapper's last accessed time
+
+ boolean dataLeft = hasMoreDataToFlush();
+
+ if (dataLeft) {
+ writeToSocket();
+ }
+
+ if (!dataLeft && bufferedWrites!=null) {
+ Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+ while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+ transfer(buffer.getBuf(), bbuf);
+ if (buffer.getBuf().remaining() == 0) {
+ bufIter.remove();
+ }
+ writeToSocket();
+ //here we must break if we didn't finish the write
+ }
}
- bbuf.clear();
}
- // TODO: Non-blocking IO not yet implemented so always returns false
- return false;
+
+ dataLeft = hasMoreDataToFlush();
+
+ return hasMoreDataToFlush();
+ }
+
+
+ private void writeToSocket() throws IOException {
+ // TODO Implement non-blocking writes
+ if (Socket.sendbb(socket, 0, bbuf.position()) < 0) {
+ throw new IOException();
+ }
+ bbuf.clear();
+
+ }
+
+
+ private void transfer(ByteBuffer from, ByteBuffer to) {
+ int max = Math.min(from.remaining(), to.remaining());
+ ByteBuffer tmp = from.duplicate ();
+ tmp.limit (tmp.position() + max);
+ to.put (tmp);
+ from.position(from.position() + max);
}
@@ -169,8 +250,7 @@ public class InternalAprOutputBuffer ext
@Override
protected boolean hasMoreDataToFlush() {
- // TODO
- return false;
+ return bbuf.position() > 0;
}
@@ -187,24 +267,12 @@ public class InternalAprOutputBuffer ext
* Write chunk.
*/
@Override
- public int doWrite(ByteChunk chunk, Response res)
- throws IOException {
+ public int doWrite(ByteChunk chunk, Response res) throws IOException {
int len = chunk.getLength();
int start = chunk.getStart();
byte[] b = chunk.getBuffer();
- while (len > 0) {
- int thisTime = len;
- if (bbuf.position() == bbuf.capacity()) {
- flushBuffer(isBlocking());
- }
- if (thisTime > bbuf.capacity() - bbuf.position()) {
- thisTime = bbuf.capacity() - bbuf.position();
- }
- bbuf.put(b, start, thisTime);
- len = len - thisTime;
- start = start + thisTime;
- }
+ addToBB(b, start,len);
byteCount += chunk.getLength();
return chunk.getLength();
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu May 2 20:59:05 2013
@@ -50,7 +50,7 @@ public class InternalNioOutputBuffer ext
this.response = response;
- buf = new byte[headerBufferSize];
+ headerBuffer = new byte[headerBufferSize];
outputStreamOutputBuffer = new SocketOutputBuffer();
@@ -185,18 +185,22 @@ public class InternalNioOutputBuffer ext
if (pos > 0) {
// Sending the response header buffer
- addToBB(buf, 0, pos);
+ addToBB(headerBuffer, 0, pos);
}
}
- private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
- //try to write to socket first
- if (length==0) return;
+ private synchronized void addToBB(byte[] buf, int offset, int length)
+ throws IOException {
+
+ if (length == 0) return;
+ // Try to flush any data in the socket's write buffer first
boolean dataLeft = flushBuffer(isBlocking());
+ // Keep writing until all the data is written or a non-blocking write
+ // leaves data in the buffer
while (!dataLeft && length>0) {
int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
length = length - thisTime;
@@ -215,6 +219,7 @@ public class InternalNioOutputBuffer ext
}
}
+
private void addToBuffers(byte[] buf, int offset, int length) {
ByteBufferHolder holder = bufferedWrites.peekLast();
if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
@@ -264,9 +269,7 @@ public class InternalNioOutputBuffer ext
}
}
- dataLeft = hasMoreDataToFlush();
-
- return dataLeft;
+ return hasMoreDataToFlush();
}
@Override
@@ -281,13 +284,12 @@ public class InternalNioOutputBuffer ext
return max;
}
- private int transfer(ByteBuffer from, ByteBuffer to) {
+ private void transfer(ByteBuffer from, ByteBuffer to) {
int max = Math.min(from.remaining(), to.remaining());
ByteBuffer tmp = from.duplicate ();
tmp.limit (tmp.position() + max);
to.put (tmp);
from.position(from.position() + max);
- return max;
}
@@ -297,16 +299,14 @@ public class InternalNioOutputBuffer ext
* This class is an output buffer which will write data to an output
* stream.
*/
- protected class SocketOutputBuffer
- implements OutputBuffer {
+ protected class SocketOutputBuffer implements OutputBuffer {
/**
* Write chunk.
*/
@Override
- public int doWrite(ByteChunk chunk, Response res)
- throws IOException {
+ public int doWrite(ByteChunk chunk, Response res) throws IOException {
int len = chunk.getLength();
int start = chunk.getStart();
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java?rev=1478542&r1=1478541&r2=1478542&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java Thu May 2 20:59:05 2013
@@ -44,7 +44,7 @@ public class InternalOutputBuffer extend
this.response = response;
- buf = new byte[headerBufferSize];
+ headerBuffer = new byte[headerBufferSize];
outputStreamOutputBuffer = new OutputStreamOutputBuffer();
@@ -161,9 +161,9 @@ public class InternalOutputBuffer extend
if (pos > 0) {
// Sending the response header buffer
if (useSocketBuffer) {
- socketBuffer.append(buf, 0, pos);
+ socketBuffer.append(headerBuffer, 0, pos);
} else {
- outputStream.write(buf, 0, pos);
+ outputStream.write(headerBuffer, 0, pos);
}
}
@@ -186,7 +186,7 @@ public class InternalOutputBuffer extend
@Override
protected boolean hasMoreDataToFlush() {
- // TODO
+ // Blocking IO so always returns false.
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org