You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/01/08 14:10:06 UTC
svn commit: r1650271 - in /tomcat/trunk/java/org/apache:
coyote/http11/InternalAprOutputBuffer.java tomcat/util/net/AprEndpoint.java
tomcat/util/net/Nio2Endpoint.java tomcat/util/net/NioEndpoint.java
tomcat/util/net/SocketWrapperBase.java
Author: markt
Date: Thu Jan 8 13:10:05 2015
New Revision: 1650271
URL: http://svn.apache.org/r1650271
Log:
First (untested) pass at moving APR writes to SocketWrapper
Modified:
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu Jan 8 13:10:05 2015
@@ -19,14 +19,9 @@ package org.apache.coyote.http11;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.coyote.Response;
import org.apache.tomcat.jni.Socket;
-import org.apache.tomcat.jni.Status;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -76,6 +71,7 @@ public class InternalAprOutputBuffer ext
this.endpoint = socketWrapper.getEndpoint();
Socket.setsbb(this.socket, socketWriteBuffer);
+ socketWrapper.socketWriteBuffer = socketWriteBuffer;
}
@@ -99,166 +95,25 @@ public class InternalAprOutputBuffer ext
@Override
public void sendAck() throws IOException {
if (!committed) {
- if (Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0)
+ addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
+ if (flushBuffer(true)) {
throw new IOException(sm.getString("iob.failedwrite.ack"));
- }
- }
-
-
- // ------------------------------------------------------ Protected Methods
-
- @Override
- protected synchronized void addToBB(byte[] buf, int offset, int length)
- throws IOException {
-
- if (length == 0) return;
-
- // If bbuf is currently being used for writes, add this data to the
- // write buffer
- if (writeBufferFlipped) {
- addToBuffers(buf, offset, length);
- return;
- }
-
- // Keep writing until all the data is written or a non-blocking write
- // leaves data in the buffer
- while (length > 0) {
- int thisTime = length;
- if (socketWriteBuffer.position() == socketWriteBuffer.capacity()) {
- if (flushBuffer(isBlocking())) {
- break;
- }
- }
- if (thisTime > socketWriteBuffer.capacity() - socketWriteBuffer.position()) {
- thisTime = socketWriteBuffer.capacity() - socketWriteBuffer.position();
}
- socketWriteBuffer.put(buf, offset, thisTime);
- length = length - thisTime;
- offset = offset + thisTime;
- }
-
- if (!isBlocking() && length>0) {
- // Buffer the remaining data
- addToBuffers(buf, offset, length);
}
}
- private void addToBuffers(byte[] buf, int offset, int length) {
- ByteBufferHolder holder = bufferedWrites.peekLast();
- if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
- ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
- holder = new ByteBufferHolder(buffer,false);
- bufferedWrites.add(holder);
- }
- holder.getBuf().put(buf,offset,length);
- }
-
+ // ------------------------------------------------------ Protected Methods
@Override
- protected synchronized boolean flushBuffer(boolean block)
- throws IOException {
-
- if (hasMoreDataToFlush()) {
- writeToSocket(block);
- }
-
- if (bufferedWrites.size() > 0) {
- Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
- while (!hasMoreDataToFlush() && bufIter.hasNext()) {
- ByteBufferHolder buffer = bufIter.next();
- buffer.flip();
- while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
- transfer(buffer.getBuf(), socketWriteBuffer);
- if (buffer.getBuf().remaining() == 0) {
- bufIter.remove();
- }
- writeToSocket(block);
- //here we must break if we didn't finish the write
- }
- }
- }
-
- return hasMoreDataToFlush();
+ protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
+ socketWrapper.write(isBlocking(), buf, offset, length);
}
- private synchronized void writeToSocket(boolean block) throws IOException {
-
- Lock readLock = socketWrapper.getBlockingStatusReadLock();
- WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock();
-
- readLock.lock();
- try {
- if (socketWrapper.getBlockingStatus() == block) {
- writeToSocket();
- return;
- }
- } finally {
- readLock.unlock();
- }
-
- writeLock.lock();
- try {
- // Set the current settings for this socket
- socketWrapper.setBlockingStatus(block);
- if (block) {
- Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
- } else {
- Socket.timeoutSet(socket, 0);
- }
-
- // Downgrade the lock
- readLock.lock();
- try {
- writeLock.unlock();
- writeToSocket();
- } finally {
- readLock.unlock();
- }
- } finally {
- // Should have been released above but may not have been on some
- // exception paths
- if (writeLock.isHeldByCurrentThread()) {
- writeLock.unlock();
- }
- }
- }
-
- private synchronized void writeToSocket() throws IOException {
- if (!writeBufferFlipped) {
- writeBufferFlipped = true;
- socketWriteBuffer.flip();
- }
-
- int written;
-
- do {
- written = Socket.sendbb(socket, socketWriteBuffer.position(), socketWriteBuffer.remaining());
- if (Status.APR_STATUS_IS_EAGAIN(-written)) {
- written = 0;
- } else if (written < 0) {
- throw new IOException("APR error: " + written);
- }
- socketWriteBuffer.position(socketWriteBuffer.position() + written);
- } while (written > 0 && socketWriteBuffer.hasRemaining());
-
- if (socketWriteBuffer.remaining() == 0) {
- socketWriteBuffer.clear();
- writeBufferFlipped = false;
- }
- // If there is data left in the buffer the socket will be registered for
- // write further up the stack. This is to ensure the socket is only
- // registered for write once as both container and user code can trigger
- // write registration.
- }
-
-
- //-------------------------------------------------- Non-blocking IO methods
-
@Override
- protected synchronized boolean hasMoreDataToFlush() {
- return super.hasMoreDataToFlush();
+ protected boolean flushBuffer(boolean block) throws IOException {
+ return socketWrapper.flush(block);
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 8 13:10:05 2015
@@ -2505,11 +2505,8 @@ public class AprEndpoint extends Abstrac
@Override
- public void write(boolean block, byte[] b, int off, int len) throws IOException {
- doWrite(block, b, off, len);
- }
-
- private void doWrite(boolean block, byte[] b, int off, int len) throws IOException {
+ protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
+ throws IOException {
if (closed) {
throw new IOException(sm.getString("apr.closed", getSocket()));
}
@@ -2520,8 +2517,7 @@ public class AprEndpoint extends Abstrac
readLock.lock();
try {
if (getBlockingStatus() == block) {
- doWriteInternal(b, off, len);
- return;
+ return doWriteInternal(bytebuffer, flip);
}
} finally {
readLock.unlock();
@@ -2541,8 +2537,7 @@ public class AprEndpoint extends Abstrac
readLock.lock();
try {
writeLock.unlock();
- doWriteInternal(b, off, len);
- return;
+ return doWriteInternal(bytebuffer, flip);
} finally {
readLock.unlock();
}
@@ -2556,57 +2551,66 @@ public class AprEndpoint extends Abstrac
}
- private int doWriteInternal(byte[] b, int off, int len) throws IOException {
+ private int doWriteInternal(ByteBuffer bytebuffer, boolean flip)
+ throws IOException {
+ if (flip) {
+ bytebuffer.flip();
+ writeBufferFlipped = true;
+ }
- int start = off;
- int left = len;
- int written;
+ int written = 0;
+ int thisTime;
do {
+ thisTime = 0;
if (getEndpoint().isSSLEnabled()) {
if (sslOutputBuffer.remaining() == 0) {
// Buffer was fully written last time around
sslOutputBuffer.clear();
- if (left < SSL_OUTPUT_BUFFER_SIZE) {
- sslOutputBuffer.put(b, start, left);
- } else {
- sslOutputBuffer.put(b, start, SSL_OUTPUT_BUFFER_SIZE);
- }
+ transfer(bytebuffer, sslOutputBuffer);
sslOutputBuffer.flip();
+ thisTime = sslOutputBuffer.remaining();
} else {
// Buffer still has data from previous attempt to write
// APR + SSL requires that exactly the same parameters are
// passed when re-attempting the write
}
- written = Socket.sendb(getSocket().longValue(), sslOutputBuffer,
+ int sslWritten = Socket.sendb(getSocket().longValue(), sslOutputBuffer,
sslOutputBuffer.position(), sslOutputBuffer.limit());
- if (written > 0) {
+ if (sslWritten > 0) {
sslOutputBuffer.position(
- sslOutputBuffer.position() + written);
+ sslOutputBuffer.position() + sslWritten);
}
} else {
- written = Socket.send(getSocket().longValue(), b, start, left);
+ thisTime = Socket.sendb(getSocket().longValue(), bytebuffer,
+ bytebuffer.position(), bytebuffer.limit() - bytebuffer.position());
}
- if (Status.APR_STATUS_IS_EAGAIN(-written)) {
- written = 0;
- } else if (-written == Status.APR_EOF) {
- throw new EOFException(sm.getString("apr.clientAbort"));
+ if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) {
+ thisTime = 0;
+ } else if (-thisTime == Status.APR_EOF) {
+ throw new EOFException(sm.getString("socket.apr.clientAbort"));
} else if ((OS.IS_WIN32 || OS.IS_WIN64) &&
- (-written == Status.APR_OS_START_SYSERR + 10053)) {
+ (-thisTime == Status.APR_OS_START_SYSERR + 10053)) {
// 10053 on Windows is connection aborted
- throw new EOFException(sm.getString("apr.clientAbort"));
- } else if (written < 0) {
- throw new IOException(sm.getString("apr.write.error",
- Integer.valueOf(-written), getSocket(), this));
- }
- start += written;
- left -= written;
- } while (written > 0 && left > 0);
-
- if (left > 0) {
- ((AprEndpoint) getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true);
+ throw new EOFException(sm.getString("socket.apr.clientAbort"));
+ } else if (thisTime < 0) {
+ throw new IOException(sm.getString("socket.apr.write.error",
+ Integer.valueOf(-thisTime), getSocket(), this));
+ }
+ written += thisTime;
+ bytebuffer.position(bytebuffer.position() + thisTime);
+ } while (thisTime > 0 && bytebuffer.hasRemaining());
+
+ if (bytebuffer.remaining() == 0) {
+ bytebuffer.clear();
+ writeBufferFlipped = false;
}
- return len - left;
+ // If there is data left in the buffer the socket will be registered for
+ // write further up the stack. This is to ensure the socket is only
+ // registered for write once as both container and user code can trigger
+ // write registration.
+
+ return written;
}
@@ -2615,12 +2619,5 @@ public class AprEndpoint extends Abstrac
((AprEndpoint) getEndpoint()).getPoller().add(
getSocket().longValue(), -1, read, write);
}
-
-
- @Override
- public boolean flush(boolean block) throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8 13:10:05 2015
@@ -1025,6 +1025,13 @@ public class Nio2Endpoint extends Abstra
}
+ @Override
+ protected int doWrite(ByteBuffer buffer, boolean block, boolean flip)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
private int writeInternal(boolean block, byte[] b, int off, int len)
throws IOException {
ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer();
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 8 13:10:05 2015
@@ -51,7 +51,6 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.collections.SynchronizedQueue;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -1510,91 +1509,8 @@ public class NioEndpoint extends Abstrac
@Override
- public void write(boolean block, byte[] b, int off, int len) throws IOException {
- // Always flush any data remaining in the buffers
- boolean dataLeft = flush(block);
-
- if (len == 0 || b == null) {
- return;
- }
-
- ByteBuffer socketWriteBuffer = getSocket().getBufHandler().getWriteBuffer();
-
- // Keep writing until all the data is written or a non-blocking write
- // leaves data in the buffer
- while (!dataLeft && len > 0) {
- int thisTime = transfer(b, off, len, socketWriteBuffer);
- len = len - thisTime;
- off = off + thisTime;
- int written = doWrite(socketWriteBuffer, block, true);
- if (written == 0) {
- dataLeft = true;
- } else {
- dataLeft = flush(block);
- }
- }
-
- // Prevent timeouts for just doing client writes
- access();
-
- if (!block && len > 0) {
- // Remaining data must be buffered
- addToBuffers(b, off, len);
- }
- }
-
-
- @Override
- public boolean flush(boolean block) throws IOException {
-
- //prevent timeout for async,
- SelectionKey key = getSocket().getIOChannel().keyFor(getSocket().getPoller().getSelector());
- if (key != null) {
- NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment();
- attach.access();
- }
-
- boolean dataLeft = hasMoreDataToFlush();
-
- //write to the socket, if there is anything to write
- if (dataLeft) {
- doWrite(socketWriteBuffer, block, !writeBufferFlipped);
- }
-
- dataLeft = hasMoreDataToFlush();
-
- if (!dataLeft && bufferedWrites.size() > 0) {
- Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
- while (!hasMoreDataToFlush() && bufIter.hasNext()) {
- ByteBufferHolder buffer = bufIter.next();
- buffer.flip();
- while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
- transfer(buffer.getBuf(), socketWriteBuffer);
- if (buffer.getBuf().remaining() == 0) {
- bufIter.remove();
- }
- doWrite(socketWriteBuffer, block, true);
- //here we must break if we didn't finish the write
- }
- }
- }
-
- return hasMoreDataToFlush();
- }
-
-
- private void addToBuffers(byte[] buf, int offset, int length) {
- ByteBufferHolder holder = bufferedWrites.peekLast();
- if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
- ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
- holder = new ByteBufferHolder(buffer,false);
- bufferedWrites.add(holder);
- }
- holder.getBuf().put(buf,offset,length);
- }
-
-
- private synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
+ protected synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
+ throws IOException {
if (flip) {
bytebuffer.flip();
writeBufferFlipped = true;
@@ -1619,9 +1535,7 @@ public class NioEndpoint extends Abstrac
pool.put(selector);
}
}
- if (block || bytebuffer.remaining() == 0) {
- // Blocking writes must empty the buffer
- // and if remaining==0 then we did empty it
+ if (bytebuffer.remaining() == 0) {
bytebuffer.clear();
writeBufferFlipped = false;
}
@@ -1629,6 +1543,7 @@ public class NioEndpoint extends Abstrac
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
+
return written;
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650271&r1=1650270&r2=1650271&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 8 13:10:05 2015
@@ -70,7 +70,8 @@ public abstract class SocketWrapperBase<
*/
private final Object writeThreadLock = new Object();
- protected ByteBuffer socketWriteBuffer;
+ // TODO This being public is a temporary hack to simplify refactoring
+ public volatile ByteBuffer socketWriteBuffer;
protected volatile boolean writeBufferFlipped;
/**
@@ -289,7 +290,38 @@ public abstract class SocketWrapperBase<
*
* @throws IOException If an IO error occurs during the write
*/
- public abstract void write(boolean block, byte[] b, int off, int len) throws IOException;
+ public void write(boolean block, byte[] b, int off, int len) throws IOException {
+ // Always flush any data remaining in the buffers
+ boolean dataLeft = flush(block);
+
+ if (len == 0 || b == null) {
+ return;
+ }
+
+ // Keep writing until all the data is written or a non-blocking write
+ // leaves data in the buffer
+ while (!dataLeft && len > 0) {
+ int thisTime = transfer(b, off, len, socketWriteBuffer);
+ len = len - thisTime;
+ off = off + thisTime;
+ int written = doWrite(socketWriteBuffer, block, true);
+ if (written == 0) {
+ dataLeft = true;
+ } else {
+ dataLeft = flush(block);
+ }
+ }
+
+ // Prevent timeouts for just doing client writes
+ access();
+
+ if (!block && len > 0) {
+ // Remaining data must be buffered
+ addToBuffers(b, off, len);
+ }
+ }
+
+
/**
* Writes as much data as possible from any that remains in the buffers.
@@ -303,7 +335,54 @@ public abstract class SocketWrapperBase<
*
* @throws IOException If an IO error occurs during the write
*/
- public abstract boolean flush(boolean block) throws IOException;
+ public boolean flush(boolean block) throws IOException {
+
+ // Prevent timeout for async
+ access();
+
+ boolean dataLeft = hasMoreDataToFlush();
+
+ // Write to the socket, if there is anything to write
+ if (dataLeft) {
+ doWrite(socketWriteBuffer, block, !writeBufferFlipped);
+ }
+
+ dataLeft = hasMoreDataToFlush();
+
+ if (!dataLeft && bufferedWrites.size() > 0) {
+ Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+ while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+ transfer(buffer.getBuf(), socketWriteBuffer);
+ if (buffer.getBuf().remaining() == 0) {
+ bufIter.remove();
+ }
+ doWrite(socketWriteBuffer, block, true);
+ //here we must break if we didn't finish the write
+ }
+ }
+ }
+
+ return hasMoreDataToFlush();
+ }
+
+
+ protected abstract int doWrite(ByteBuffer buffer, boolean block, boolean flip)
+ throws IOException;
+
+
+ protected void addToBuffers(byte[] buf, int offset, int length) {
+ ByteBufferHolder holder = bufferedWrites.peekLast();
+ if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+ ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+ holder = new ByteBufferHolder(buffer,false);
+ bufferedWrites.add(holder);
+ }
+ holder.getBuf().put(buf,offset,length);
+ }
+
public abstract void regsiterForEvent(boolean read, boolean write);
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org