You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/01/08 14:09:55 UTC
svn commit: r1650269 - in /tomcat/trunk/java/org/apache: coyote/ajp/
coyote/http11/ coyote/http11/upgrade/ tomcat/util/net/
Author: markt
Date: Thu Jan 8 13:09:54 2015
New Revision: 1650269
URL: http://svn.apache.org/r1650269
Log:
Move writes and associated buffers to SocketWrapper for NIO. NIO2/APR likely broken at this point.
Modified:
tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Jan 8 13:09:54 2015
@@ -24,8 +24,6 @@ import java.nio.ByteBuffer;
import java.security.NoSuchProviderException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.RequestDispatcher;
@@ -44,7 +42,6 @@ import org.apache.coyote.Response;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.buf.MessageBytes;
@@ -183,22 +180,6 @@ public class AjpProcessor<S> extends Abs
/**
- * The max size of the buffered write buffer
- */
- private int bufferedWriteSize = 64*1024; //64k default write buffer
-
-
- /**
- * For "non-blocking" writes use an external set of buffers. Although the
- * API only allows one non-blocking write at a time, due to buffering and
- * the possible need to write HTTP headers, there may be more than one write
- * to the OutputBuffer.
- */
- private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
- new LinkedBlockingDeque<>();
-
-
- /**
* Host name (used to avoid useless B2C conversion on the host name).
*/
protected char[] hostNameC = new char[0];
@@ -605,7 +586,7 @@ public class AjpProcessor<S> extends Abs
}
case NB_WRITE_INTEREST: {
AtomicBoolean isReady = (AtomicBoolean)param;
- boolean result = bufferedWrites.size() == 0 && responseMsgPos == -1;
+ boolean result = !socketWrapper.hasDataToWrite() && responseMsgPos == -1;
isReady.set(result);
if (!result) {
registerForEvent(false, true);
@@ -647,7 +628,8 @@ public class AjpProcessor<S> extends Abs
asyncStateMachine.asyncOperation();
try {
if (hasDataToWrite()) {
- flushBufferedData();
+ boolean blocking = (response.getWriteListener() == null);
+ socketWrapper.flush(blocking);
if (hasDataToWrite()) {
// There is data to write but go via Response to
// maintain a consistent view of non-blocking state
@@ -755,7 +737,7 @@ public class AjpProcessor<S> extends Abs
}
cping = true;
try {
- output(pongMessageArray, 0, pongMessageArray.length, true);
+ socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length);
} catch (IOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
}
@@ -1053,7 +1035,7 @@ public class AjpProcessor<S> extends Abs
// Request more data immediately
if (!waitingForBodyMessage) {
- output(getBodyMessageArray, 0, getBodyMessageArray.length, true);
+ socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length);
waitingForBodyMessage = true;
}
@@ -1460,7 +1442,7 @@ public class AjpProcessor<S> extends Abs
// Write to buffer
responseMessage.end();
- output(responseMessage.getBuffer(), 0, responseMessage.getLen(), true);
+ socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen());
}
@@ -1473,7 +1455,7 @@ public class AjpProcessor<S> extends Abs
// TODO Validate the assertion above
if (explicit && !finished) {
// Send the flush message
- output(flushMessageArray, 0, flushMessageArray.length, true);
+ socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length);
}
}
@@ -1505,22 +1487,13 @@ public class AjpProcessor<S> extends Abs
// Add the end message
if (getErrorState().isError()) {
- output(endAndCloseMessageArray, 0, endAndCloseMessageArray.length, true);
+ socketWrapper.write(true, endAndCloseMessageArray, 0, endAndCloseMessageArray.length);
} else {
- output(endMessageArray, 0, endMessageArray.length, true);
+ socketWrapper.write(true, endMessageArray, 0, endMessageArray.length);
}
}
- private int output(byte[] src, int offset, int length,
- boolean block) throws IOException {
- if (socketWrapper == null || socketWrapper.getSocket() == null)
- return -1;
-
- return socketWrapper.write(block, src, offset, length);
- }
-
-
private boolean available() {
if (endOfStream) {
return false;
@@ -1569,15 +1542,12 @@ public class AjpProcessor<S> extends Abs
socketWrapper.access();
boolean blocking = (response.getWriteListener() == null);
- if (!blocking) {
- flushBufferedData();
- }
int len = chunk.getLength();
int off = 0;
// Write this chunk
- while (responseMsgPos == -1 && len > 0) {
+ while (len > 0) {
int thisTime = len;
if (thisTime > outputMaxChunkSize) {
thisTime = outputMaxChunkSize;
@@ -1586,96 +1556,18 @@ public class AjpProcessor<S> extends Abs
responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
responseMessage.end();
- writeResponseMessage(blocking);
+ socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen());
len -= thisTime;
off += thisTime;
}
bytesWritten += off;
-
- if (len > 0) {
- // Add this chunk to the buffer
- addToBuffers(chunk.getBuffer(), off, len);
- }
- }
-
-
- private void addToBuffers(byte[] buf, int offset, int length) {
- ByteBufferHolder holder = bufferedWrites.peekLast();
- if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) {
- ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
- holder = new ByteBufferHolder(buffer, false);
- bufferedWrites.add(holder);
- }
- holder.getBuf().put(buf, offset, length);
}
private boolean hasDataToWrite() {
- return responseMsgPos != -1 || bufferedWrites.size() > 0;
- }
-
-
- private void flushBufferedData() throws IOException {
-
- if (responseMsgPos > -1) {
- // Must be using non-blocking IO
- // Partially written response message. Try and complete it.
- writeResponseMessage(false);
- }
-
- while (responseMsgPos == -1 && bufferedWrites.size() > 0) {
- // Try and write any remaining buffer data
- Iterator<ByteBufferHolder> holders = bufferedWrites.iterator();
- ByteBufferHolder holder = holders.next();
- holder.flip();
- ByteBuffer buffer = holder.getBuf();
- int initialBufferSize = buffer.remaining();
- while (responseMsgPos == -1 && buffer.remaining() > 0) {
- transferToResponseMsg(buffer);
- writeResponseMessage(false);
- }
- bytesWritten += (initialBufferSize - buffer.remaining());
- if (buffer.remaining() == 0) {
- holders.remove();
- }
- }
- }
-
-
- private void transferToResponseMsg(ByteBuffer buffer) {
-
- int thisTime = buffer.remaining();
- if (thisTime > outputMaxChunkSize) {
- thisTime = outputMaxChunkSize;
- }
-
- responseMessage.reset();
- responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
- buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime);
- responseMessage.end();
- }
-
-
- private void writeResponseMessage(boolean block) throws IOException {
- int len = responseMessage.getLen();
- int written = 1;
- if (responseMsgPos == -1) {
- // New message. Advance the write position to the beginning
- responseMsgPos = 0;
- }
-
- while (written > 0 && responseMsgPos < len) {
- written = output(
- responseMessage.getBuffer(), responseMsgPos, len - responseMsgPos, block);
- responseMsgPos += written;
- }
-
- // Message fully written, reset the position for a new message.
- if (responseMsgPos == len) {
- responseMsgPos = -1;
- }
+ return responseMsgPos != -1 || socketWrapper.hasDataToWrite();
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu Jan 8 13:09:54 2015
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.coyote.ActionCode;
@@ -653,19 +652,7 @@ public abstract class AbstractOutputBuff
public boolean hasDataToWrite() {
- return hasMoreDataToFlush() || hasBufferedData();
- }
-
-
- protected boolean hasBufferedData() {
- boolean result = false;
- if (bufferedWrites!=null) {
- Iterator<ByteBufferHolder> iter = bufferedWrites.iterator();
- while (!result && iter.hasNext()) {
- result = iter.next().hasData();
- }
- }
- return result;
+ return socketWrapper.hasDataToWrite();
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java Thu Jan 8 13:09:54 2015
@@ -417,7 +417,6 @@ public class InternalNio2OutputBuffer ex
}
}
- @Override
protected boolean hasBufferedData() {
return bufferedWrites.size() > 0;
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu Jan 8 13:09:54 2015
@@ -18,16 +18,11 @@
package org.apache.coyote.http11;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
import org.apache.coyote.Response;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.net.NioChannel;
-import org.apache.tomcat.util.net.NioEndpoint;
-import org.apache.tomcat.util.net.NioSelectorPool;
+import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
import org.apache.tomcat.util.net.SocketWrapperBase;
/**
@@ -47,25 +42,12 @@ public class InternalNioOutputBuffer ext
}
- /**
- * Underlying socket.
- */
- private NioChannel socket;
-
- /**
- * Selector pool, for blocking reads and blocking writes
- */
- private NioSelectorPool pool;
-
-
// --------------------------------------------------------- Public Methods
@Override
public void init(SocketWrapperBase<NioChannel> socketWrapper) {
super.init(socketWrapper);
- socket = socketWrapper.getSocket();
- pool = ((NioEndpoint)socketWrapper.getEndpoint()).getSelectorPool();
- socketWriteBuffer = socket.getBufHandler().getWriteBuffer();
+ socketWriteBuffer = socketWrapper.getSocket().getBufHandler().getWriteBuffer();
}
@@ -77,7 +59,7 @@ public class InternalNioOutputBuffer ext
public void recycle() {
super.recycle();
socketWriteBuffer.clear();
- socket = null;
+ socketWrapper = null;
}
@@ -89,103 +71,19 @@ public class InternalNioOutputBuffer ext
@Override
public void sendAck() throws IOException {
if (!committed) {
- socketWriteBuffer.put(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
- int result = writeToSocket(socketWriteBuffer, true, true);
- if (result < 0) {
+ addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
+ if (flushBuffer(true)) {
throw new IOException(sm.getString("iob.failedwrite.ack"));
}
}
}
- /**
- *
- * @param bytebuffer ByteBuffer
- * @param flip boolean
- * @return int
- * @throws IOException
- */
- private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
- if ( flip ) {
- bytebuffer.flip();
- writeBufferFlipped = true;
- }
-
- int written = 0;
- NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
- if ( att == null ) throw new IOException("Key must be cancelled");
- long writeTimeout = att.getWriteTimeout();
- Selector selector = null;
- try {
- selector = pool.get();
- } catch ( IOException x ) {
- //ignore
- }
- try {
- written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
- //make sure we are flushed
- do {
- if (socket.flush(true,selector,writeTimeout)) break;
- }while ( true );
- } finally {
- if ( selector != null ) pool.put(selector);
- }
- if ( block || bytebuffer.remaining()==0) {
- //blocking writes must empty the buffer
- //and if remaining==0 then we did empty it
- bytebuffer.clear();
- writeBufferFlipped = false;
- }
- // If there is data left in the buffer the socket will be registered for
- // write further up the stack. This is to ensure the socket is only
- // registered for write once as both container and user code can trigger
- // write registration.
- return written;
- }
-
// ------------------------------------------------------ Protected Methods
@Override
- protected synchronized void addToBB(byte[] buf, int offset, int length)
- throws IOException {
-
- if (length == 0) return;
-
- // Try to flush any data in the socket's write buffer first
- boolean dataLeft = flushBuffer(isBlocking());
-
- // Keep writing until all the data is written or a non-blocking write
- // leaves data in the buffer
- while (!dataLeft && length > 0) {
- int thisTime = transfer(buf,offset,length,socketWriteBuffer);
- length = length - thisTime;
- offset = offset + thisTime;
- int written = writeToSocket(socketWriteBuffer, isBlocking(), true);
- if (written == 0) {
- dataLeft = true;
- } else {
- dataLeft = flushBuffer(isBlocking());
- }
- }
-
- // Prevent timeouts for just doing client writes
- socketWrapper.access();
-
- if (!isBlocking() && length > 0) {
- // Remaining data must be buffered
- addToBuffers(buf, offset, length);
- }
- }
-
-
- private void addToBuffers(byte[] buf, int offset, int length) {
- ByteBufferHolder holder = bufferedWrites.peekLast();
- if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
- ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
- holder = new ByteBufferHolder(buffer,false);
- bufferedWrites.add(holder);
- }
- holder.getBuf().put(buf,offset,length);
+ protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
+ socketWrapper.write(isBlocking(), buf, offset, length);
}
@@ -194,49 +92,12 @@ public class InternalNioOutputBuffer ext
*/
@Override
protected boolean flushBuffer(boolean block) throws IOException {
-
- //prevent timeout for async,
- SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- if (key != null) {
- NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment();
- attach.access();
- }
-
- boolean dataLeft = hasMoreDataToFlush();
-
- //write to the socket, if there is anything to write
- if (dataLeft) {
- writeToSocket(socketWriteBuffer, block, !writeBufferFlipped);
- }
-
- dataLeft = hasMoreDataToFlush();
-
- if (!dataLeft && bufferedWrites.size() > 0) {
- Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
- while (!hasMoreDataToFlush() && bufIter.hasNext()) {
- ByteBufferHolder buffer = bufIter.next();
- buffer.flip();
- while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
- transfer(buffer.getBuf(), socketWriteBuffer);
- if (buffer.getBuf().remaining() == 0) {
- bufIter.remove();
- }
- writeToSocket(socketWriteBuffer, block, true);
- //here we must break if we didn't finish the write
- }
- }
- }
-
- return hasMoreDataToFlush();
+ return socketWrapper.flush(block);
}
@Override
protected void registerWriteInterest() throws IOException {
- NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
- if (att == null) {
- throw new IOException("Key must be cancelled");
- }
- att.getPoller().add(socket, SelectionKey.OP_WRITE);
+ ((NioSocketWrapper) socketWrapper).getPoller().add(socketWrapper.getSocket(), SelectionKey.OP_WRITE);
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Thu Jan 8 13:09:54 2015
@@ -55,18 +55,10 @@ public class UpgradeServletOutputStream
private volatile ClassLoader applicationLoader = null;
- // Writes guarded by writeLock
- private volatile byte[] buffer;
- private volatile int bufferPos;
- private volatile int bufferLimit;
- private final int asyncWriteBufferSize;
-
public UpgradeServletOutputStream(SocketWrapperBase<?> socketWrapper,
int asyncWriteBufferSize) {
this.socketWrapper = socketWrapper;
- this.asyncWriteBufferSize = asyncWriteBufferSize;
- buffer = new byte[asyncWriteBufferSize];
}
@@ -80,7 +72,7 @@ public class UpgradeServletOutputStream
// Make sure isReady() and onWritePossible() have a consistent view of
// buffer and fireListener when determining if the listener should fire
synchronized (fireListenerLock) {
- boolean result = (bufferLimit == 0);
+ boolean result = !socketWrapper.hasDataToWrite();
fireListener = !result;
return result;
}
@@ -139,7 +131,7 @@ public class UpgradeServletOutputStream
private void preWriteChecks() {
- if (bufferLimit != 0) {
+ if (socketWrapper.hasDataToWrite()) {
throw new IllegalStateException(sm.getString("upgrade.sis.write.ise"));
}
}
@@ -153,34 +145,7 @@ public class UpgradeServletOutputStream
// Simple case - blocking IO
socketWrapper.write(true, b, off, len);
} else {
- // Non-blocking IO
- // If the non-blocking read does not complete, doWrite() will add
- // the socket back into the poller. The poller may trigger a new
- // write event before this method has finished updating buffer. The
- // writeLock sync makes sure that buffer is updated before the next
- // write executes.
- int written = socketWrapper.write(false, b, off, len);
- if (written < len) {
- if (b == buffer) {
- // This is a partial write of the existing buffer. Just
- // increment the current position
- bufferPos += written;
- } else {
- // This is a new partial write
- int bytesLeft = len - written;
- if (bytesLeft > buffer.length) {
- buffer = new byte[bytesLeft];
- } else if (bytesLeft < asyncWriteBufferSize &&
- buffer.length > asyncWriteBufferSize) {
- buffer = new byte[asyncWriteBufferSize];
- }
- bufferPos = 0;
- bufferLimit = bytesLeft;
- System.arraycopy(b, off + written, buffer, bufferPos, bufferLimit);
- }
- } else {
- bufferLimit = 0;
- }
+ socketWrapper.write(false, b, off, len);
}
}
@@ -188,9 +153,7 @@ public class UpgradeServletOutputStream
protected final void onWritePossible() throws IOException {
try {
synchronized (writeLock) {
- if (bufferLimit > 0) {
- writeInternal(buffer, bufferPos, bufferLimit - bufferPos);
- }
+ socketWrapper.flush(false);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
@@ -207,7 +170,7 @@ public class UpgradeServletOutputStream
// should fire
boolean fire = false;
synchronized (fireListenerLock) {
- if (bufferLimit == 0 && fireListener) {
+ if (!socketWrapper.hasDataToWrite() && fireListener) {
fireListener = false;
fire = true;
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 8 13:09:54 2015
@@ -2505,7 +2505,7 @@ public class AprEndpoint extends Abstrac
@Override
- public int write(boolean block, byte[] b, int off, int len) throws IOException {
+ public void write(boolean block, byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException(sm.getString("apr.closed", getSocket()));
@@ -2517,7 +2517,8 @@ public class AprEndpoint extends Abstrac
readLock.lock();
try {
if (getBlockingStatus() == block) {
- return doWriteInternal(b, off, len);
+ doWriteInternal(b, off, len);
+ return;
}
} finally {
readLock.unlock();
@@ -2537,7 +2538,8 @@ public class AprEndpoint extends Abstrac
readLock.lock();
try {
writeLock.unlock();
- return doWriteInternal(b, off, len);
+ doWriteInternal(b, off, len);
+ return;
} finally {
readLock.unlock();
}
@@ -2610,5 +2612,12 @@ public class AprEndpoint extends Abstrac
((AprEndpoint) getEndpoint()).getPoller().add(
getSocket().longValue(), -1, read, write);
}
+
+
+ @Override
+ public boolean flush(boolean block) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8 13:09:54 2015
@@ -992,9 +992,8 @@ public class Nio2Endpoint extends Abstra
@Override
- public int write(boolean block, byte[] b, int off, int len) throws IOException {
+ public void write(boolean block, byte[] b, int off, int len) throws IOException {
int leftToWrite = len;
- int count = 0;
int offset = off;
while (leftToWrite > 0) {
@@ -1011,11 +1010,10 @@ public class Nio2Endpoint extends Abstra
if (writtenThisLoop < 0) {
throw new EOFException();
}
- count += writtenThisLoop;
if (!block && writePending.availablePermits() == 0) {
// Prevent concurrent writes in non blocking mode,
// leftover data has to be buffered
- return count;
+ return;
}
offset += writtenThisLoop;
leftToWrite -= writtenThisLoop;
@@ -1024,8 +1022,6 @@ public class Nio2Endpoint extends Abstra
break;
}
}
-
- return count;
}
@@ -1070,6 +1066,12 @@ public class Nio2Endpoint extends Abstra
public void regsiterForEvent(boolean read, boolean write) {
// NO-OP. Appropriate handlers will already have been registered.
}
+
+ @Override
+ public boolean flush(boolean block) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 8 13:09:54 2015
@@ -51,6 +51,7 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.collections.SynchronizedQueue;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -1297,7 +1298,6 @@ public class NioEndpoint extends Abstrac
// ---------------------------------------------------- Key Attachment Class
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
- private final int maxWrite;
private final NioSelectorPool pool;
private Poller poller = null;
@@ -1310,7 +1310,6 @@ public class NioEndpoint extends Abstrac
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
super(channel, endpoint);
- maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
pool = endpoint.getSelectorPool();
}
@@ -1341,6 +1340,8 @@ public class NioEndpoint extends Abstrac
}
writeLatch = null;
setWriteTimeout(soTimeout);
+
+ socketWriteBuffer = channel.getBufHandler().getWriteBuffer();
}
public void reset() {
@@ -1509,72 +1510,128 @@ public class NioEndpoint extends Abstrac
@Override
- public int write(boolean block, byte[] b, int off, int len) throws IOException {
- int leftToWrite = len;
- int count = 0;
- int offset = off;
+ public void write(boolean block, byte[] b, int off, int len) throws IOException {
+ // Always flush any data remaining in the buffers
+ boolean dataLeft = flush(block);
- while (leftToWrite > 0) {
- int writeThisLoop;
- int writtenThisLoop;
+ if (len == 0 || b == null) {
+ return;
+ }
- if (leftToWrite > maxWrite) {
- writeThisLoop = maxWrite;
+ ByteBuffer socketWriteBuffer = getSocket().getBufHandler().getWriteBuffer();
+
+ // Keep writing until all the data is written or a non-blocking write
+ // leaves data in the buffer
+ while (!dataLeft && len > 0) {
+ int thisTime = transfer(b, off, len, socketWriteBuffer);
+ len = len - thisTime;
+ off = off + thisTime;
+ int written = writeToSocket(socketWriteBuffer, block, true);
+ if (written == 0) {
+ dataLeft = true;
} else {
- writeThisLoop = leftToWrite;
+ dataLeft = flush(block);
}
+ }
- writtenThisLoop = writeInternal(block, b, offset, writeThisLoop);
- count += writtenThisLoop;
- offset += writtenThisLoop;
- leftToWrite -= writtenThisLoop;
+ // Prevent timeouts for just doing client writes
+ access();
- if (writtenThisLoop < writeThisLoop) {
- break;
+ if (!block && len > 0) {
+ // Remaining data must be buffered
+ addToBuffers(b, off, len);
+ }
+ }
+
+
+ @Override
+ public boolean flush(boolean block) throws IOException {
+
+ //prevent timeout for async,
+ SelectionKey key = getSocket().getIOChannel().keyFor(getSocket().getPoller().getSelector());
+ if (key != null) {
+ NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment();
+ attach.access();
+ }
+
+ boolean dataLeft = hasMoreDataToFlush();
+
+ //write to the socket, if there is anything to write
+ if (dataLeft) {
+ writeToSocket(socketWriteBuffer, block, !writeBufferFlipped);
+ }
+
+ dataLeft = hasMoreDataToFlush();
+
+ if (!dataLeft && bufferedWrites.size() > 0) {
+ Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+ while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+ transfer(buffer.getBuf(), socketWriteBuffer);
+ if (buffer.getBuf().remaining() == 0) {
+ bufIter.remove();
+ }
+ writeToSocket(socketWriteBuffer, block, true);
+ //here we must break if we didn't finish the write
+ }
}
}
- return count;
+ return hasMoreDataToFlush();
}
- private int writeInternal (boolean block, byte[] b, int off, int len)
- throws IOException {
-
- NioEndpoint.NioSocketWrapper att =
- (NioEndpoint.NioSocketWrapper) getSocket().getAttachment();
- if (att == null) {
- throw new IOException("Key must be cancelled");
+ private void addToBuffers(byte[] buf, int offset, int length) {
+ ByteBufferHolder holder = bufferedWrites.peekLast();
+ if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+ ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+ holder = new ByteBufferHolder(buffer,false);
+ bufferedWrites.add(holder);
}
+ holder.getBuf().put(buf,offset,length);
+ }
- ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer();
- writeBuffer.clear();
- writeBuffer.put(b, off, len);
- writeBuffer.flip();
+
+ private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
+ if (flip) {
+ bytebuffer.flip();
+ writeBufferFlipped = true;
+ }
int written = 0;
- long writeTimeout = att.getWriteTimeout();
+ long writeTimeout = getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
- } catch ( IOException x ) {
- //ignore
+ } catch (IOException x) {
+ // Ignore
}
try {
- written = pool.write(writeBuffer, getSocket(), selector,
- writeTimeout, block);
+ written = pool.write(bytebuffer, getSocket(), selector, writeTimeout, block);
+ // Make sure we are flushed
+ do {
+ if (getSocket().flush(true, selector, writeTimeout)) break;
+ } while (true);
} finally {
if (selector != null) {
pool.put(selector);
}
}
- if (written < len) {
- getSocket().getPoller().add(getSocket(), SelectionKey.OP_WRITE);
- }
+ if (block || bytebuffer.remaining() == 0) {
+ // Blocking writes must empty the buffer
+ // and if remaining==0 then we did empty it
+ bytebuffer.clear();
+ writeBufferFlipped = false;
+ }
+ // If there is data left in the buffer the socket will be registered for
+ // write further up the stack. This is to ensure the socket is only
+ // registered for write once as both container and user code can trigger
+ // write registration.
return written;
}
-
@Override
public void regsiterForEvent(boolean read, boolean write) {
SelectionKey key = getSocket().getIOChannel().keyFor(
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650269&r1=1650268&r2=1650269&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 8 13:09:54 2015
@@ -21,10 +21,13 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.tomcat.util.buf.ByteBufferHolder;
+
public abstract class SocketWrapperBase<E> {
private volatile E socket;
@@ -67,6 +70,23 @@ public abstract class SocketWrapperBase<
*/
private final Object writeThreadLock = new Object();
+ protected ByteBuffer socketWriteBuffer;
+ protected volatile boolean writeBufferFlipped;
+
+ /**
+ * For "non-blocking" writes use an external set of buffers. Although the
+ * API only allows one non-blocking write at a time, due to buffering and
+ * the possible need to write HTTP headers, there may be more than one write
+ * to the OutputBuffer.
+ */
+ protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
+ new LinkedBlockingDeque<>();
+
+ /**
+ * The max size of the buffered write buffer
+ */
+ protected int bufferedWriteSize = 64*1024; //64k default write buffer
+
private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) {
@@ -157,6 +177,28 @@ public abstract class SocketWrapperBase<
return blockingStatusWriteLock;
}
public Object getWriteThreadLock() { return writeThreadLock; }
+
+ protected boolean hasMoreDataToFlush() {
+ return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) ||
+ (!writeBufferFlipped && socketWriteBuffer.position() > 0);
+ }
+
+ protected boolean hasBufferedData() {
+ boolean result = false;
+ if (bufferedWrites!=null) {
+ Iterator<ByteBufferHolder> iter = bufferedWrites.iterator();
+ while (!result && iter.hasNext()) {
+ result = iter.next().hasData();
+ }
+ }
+ return result;
+ }
+
+ public boolean hasDataToWrite() {
+ return hasMoreDataToFlush() || hasBufferedData();
+ }
+
+
public void addDispatch(DispatchType dispatchType) {
synchronized (dispatches) {
dispatches.add(dispatchType);
@@ -233,7 +275,52 @@ public abstract class SocketWrapperBase<
public abstract void unRead(ByteBuffer input);
public abstract void close() throws IOException;
- public abstract int write(boolean block, byte[] b, int off, int len) throws IOException;
+ /**
+ * Writes the provided data to the socket, buffering any remaining data if
+ * used in non-blocking mode. If any data remains in the buffers from a
+ * previous write then that data will be written before this data. It is
+ * therefore unnecessary to call flush() before calling this method.
+ *
+ * @param block <code>true<code> if a blocking write should be used,
+ * otherwise a non-blocking write will be used
+ * @param b The byte array containing the data to be written
+ * @param off The offset within the byte array of the data to be written
+ * @param len The length of the data to be written
+ *
+ * @throws IOException If an IO error occurs during the write
+ */
+ public abstract void write(boolean block, byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Writes as much data as possible from any that remains in the buffers.
+ *
+ * @param block <code>true<code> if a blocking write should be used,
+ * otherwise a non-blocking write will be used
+ *
+ * @return <code>true</code> if data remains to be flushed after this method
+ * completes, otherwise <code>false</code>. In blocking mode
+ * therefore, the return value should always be <code>false</code>
+ *
+ * @throws IOException If an IO error occurs during the write
+ */
+ public abstract boolean flush(boolean block) throws IOException;
public abstract void regsiterForEvent(boolean read, boolean write);
+
+
+ // --------------------------------------------------------- Utility methods
+
+ protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) {
+ int max = Math.min(length, to.remaining());
+ to.put(from, offset, max);
+ return max;
+ }
+
+ protected static void transfer(ByteBuffer from, ByteBuffer to) {
+ int max = Math.min(from.remaining(), to.remaining());
+ int fromLimit = from.limit();
+ from.limit(from.position() + max);
+ to.put(from);
+ from.limit(fromLimit);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org