You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/01/12 09:59:43 UTC
svn commit: r1651043 - in /tomcat/trunk: java/org/apache/coyote/ajp/
java/org/apache/coyote/http11/upgrade/ java/org/apache/tomcat/util/net/
java/org/apache/tomcat/websocket/server/
test/org/apache/catalina/nonblocking/ test/org/apache/coyote/http11/up...
Author: markt
Date: Mon Jan 12 08:59:42 2015
New Revision: 1651043
URL: http://svn.apache.org/r1651043
Log:
A further round of refactoring of writes. Primary change is the
use of socketWriteBuffer by default for all writes
Modified:
tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Mon Jan 12 08:59:42 2015
@@ -737,6 +737,7 @@ public class AjpProcessor<S> extends Abs
cping = true;
try {
socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length);
+ socketWrapper.flush(true);
} catch (IOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
}
@@ -1035,6 +1036,7 @@ public class AjpProcessor<S> extends Abs
// Request more data immediately
if (!waitingForBodyMessage) {
socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length);
+ socketWrapper.flush(true);
waitingForBodyMessage = true;
}
@@ -1442,6 +1444,7 @@ public class AjpProcessor<S> extends Abs
// Write to buffer
responseMessage.end();
socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen());
+ socketWrapper.flush(true);
}
@@ -1455,6 +1458,7 @@ public class AjpProcessor<S> extends Abs
if (explicit && !finished) {
// Send the flush message
socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length);
+ socketWrapper.flush(true);
}
}
@@ -1490,6 +1494,7 @@ public class AjpProcessor<S> extends Abs
} else {
socketWrapper.write(true, endMessageArray, 0, endMessageArray.length);
}
+ socketWrapper.flush(true);
}
@@ -1556,6 +1561,7 @@ public class AjpProcessor<S> extends Abs
responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
responseMessage.end();
socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen());
+ socketWrapper.flush(blocking);
len -= thisTime;
off += thisTime;
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Mon Jan 12 08:59:42 2015
@@ -123,6 +123,12 @@ public class UpgradeServletOutputStream
@Override
+ public void flush() throws IOException {
+ socketWrapper.flush(listener == null);
+ }
+
+
+ @Override
public void close() throws IOException {
closeRequired = true;
socketWrapper.close();
@@ -130,7 +136,7 @@ public class UpgradeServletOutputStream
private void preWriteChecks() {
- if (listener != null && socketWrapper.hasDataToWrite()) {
+ if (listener != null && !socketWrapper.canWrite()) {
throw new IllegalStateException(sm.getString("upgrade.sis.write.ise"));
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Jan 12 08:59:42 2015
@@ -2509,8 +2509,7 @@ public class AprEndpoint extends Abstrac
@Override
- protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
- throws IOException {
+ protected int doWrite(boolean block, boolean flip) throws IOException {
if (closed) {
throw new IOException(sm.getString("apr.closed", getSocket()));
}
@@ -2521,7 +2520,7 @@ public class AprEndpoint extends Abstrac
readLock.lock();
try {
if (getBlockingStatus() == block) {
- return doWriteInternal(bytebuffer, flip);
+ return doWriteInternal(flip);
}
} finally {
readLock.unlock();
@@ -2541,7 +2540,7 @@ public class AprEndpoint extends Abstrac
readLock.lock();
try {
writeLock.unlock();
- return doWriteInternal(bytebuffer, flip);
+ return doWriteInternal(flip);
} finally {
readLock.unlock();
}
@@ -2555,10 +2554,9 @@ public class AprEndpoint extends Abstrac
}
- private int doWriteInternal(ByteBuffer bytebuffer, boolean flip)
- throws IOException {
+ private int doWriteInternal(boolean flip) throws IOException {
if (flip) {
- bytebuffer.flip();
+ socketWriteBuffer.flip();
writeBufferFlipped = true;
}
@@ -2571,7 +2569,7 @@ public class AprEndpoint extends Abstrac
if (sslOutputBuffer.remaining() == 0) {
// Buffer was fully written last time around
sslOutputBuffer.clear();
- transfer(bytebuffer, sslOutputBuffer);
+ transfer(socketWriteBuffer, sslOutputBuffer);
sslOutputBuffer.flip();
} else {
// Buffer still has data from previous attempt to write
@@ -2585,8 +2583,9 @@ public class AprEndpoint extends Abstrac
sslOutputBuffer.position() + sslWritten);
}
} else {
- thisTime = Socket.sendb(getSocket().longValue(), bytebuffer,
- bytebuffer.position(), bytebuffer.limit() - bytebuffer.position());
+ thisTime = Socket.sendb(getSocket().longValue(),
+ socketWriteBuffer, socketWriteBuffer.position(),
+ socketWriteBuffer.limit() - socketWriteBuffer.position());
}
if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) {
thisTime = 0;
@@ -2601,11 +2600,11 @@ public class AprEndpoint extends Abstrac
Integer.valueOf(-thisTime), getSocket(), this));
}
written += thisTime;
- bytebuffer.position(bytebuffer.position() + thisTime);
- } while (thisTime > 0 && bytebuffer.hasRemaining());
+ socketWriteBuffer.position(socketWriteBuffer.position() + thisTime);
+ } while (thisTime > 0 && socketWriteBuffer.hasRemaining());
- if (bytebuffer.remaining() == 0) {
- bytebuffer.clear();
+ if (socketWriteBuffer.remaining() == 0) {
+ socketWriteBuffer.clear();
writeBufferFlipped = false;
}
// If there is data left in the buffer the socket will be registered for
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Jan 12 08:59:42 2015
@@ -1064,7 +1064,10 @@ public class Nio2Endpoint extends Abstra
@Override
public void close() throws IOException {
- getSocket().close();
+ Nio2Channel socket = getSocket();
+ if (socket != null) {
+ socket.close();
+ }
}
@@ -1107,167 +1110,128 @@ public class Nio2Endpoint extends Abstra
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Overridden for NIO2 to enable a gathering write to be used to write
+ * all of the remaining data in a single additional write should a
+ * non-blocking write leave data in the buffer.
+ */
@Override
- public void write(boolean block, byte[] buf, int off, int len) throws IOException {
- if (len == 0 || getSocket() == null)
- return;
-
- if (block) {
- try {
- do {
- int thisTime = transfer(buf, off, len, socketWriteBuffer);
- len = len - thisTime;
- off = off + thisTime;
- socketWriteBuffer.flip();
- while (socketWriteBuffer.hasRemaining()) {
- if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
- throw new EOFException(sm.getString("iob.failedwrite"));
- }
- }
- socketWriteBuffer.clear();
- } while (len > 0);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (TimeoutException e) {
- throw new SocketTimeoutException();
- }
- } else {
- // FIXME: Possible new behavior:
- // If there's non blocking abuse (like a test writing 1MB in a single
- // "non blocking" write), then block until the previous write is
- // done rather than continue buffering
- // Also allows doing autoblocking
- // Could be "smart" with coordination with the main CoyoteOutputStream to
- // indicate the end of a write
- // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
- synchronized (writeCompletionHandler) {
- if (writePending.tryAcquire()) {
- // No pending completion handler, so writing to the main buffer
- // is possible
- int thisTime = transfer(buf, off, len, socketWriteBuffer);
- len = len - thisTime;
- off = off + thisTime;
- if (len > 0) {
- // Remaining data must be buffered
- addToBuffers(buf, off, len);
- }
- flush(false, true);
- } else {
+ protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException {
+ // FIXME: Possible new behavior:
+ // If there's non blocking abuse (like a test writing 1MB in a single
+ // "non blocking" write), then block until the previous write is
+ // done rather than continue buffering
+ // Also allows doing autoblocking
+ // Could be "smart" with coordination with the main CoyoteOutputStream to
+ // indicate the end of a write
+ // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
+ synchronized (writeCompletionHandler) {
+ if (writePending.tryAcquire()) {
+ // No pending completion handler, so writing to the main buffer
+ // is possible
+ int thisTime = transfer(buf, off, len, socketWriteBuffer);
+ len = len - thisTime;
+ off = off + thisTime;
+ if (len > 0) {
+ // Remaining data must be buffered
addToBuffers(buf, off, len);
}
+ flushNonBlocking(true);
+ } else {
+ addToBuffers(buf, off, len);
}
}
}
@Override
- protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) throws IOException {
- // NO-OP for NIO2 since write(boolean, byte[], int, int) and
- // flush(boolean, boolean) are over-ridden.
- return 0;
+ protected int doWrite(boolean block, boolean flip) throws IOException {
+ // Only called in the non-blocking case since
+ // writeNonBlocking(byte[], int, int) and flush(boolean, boolean)
+ // are over-ridden.
+
+ int result = -1;
+ try {
+ socketWriteBuffer.flip();
+ result = socketWriteBuffer.remaining();
+ while (socketWriteBuffer.hasRemaining()) {
+ if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
+ throw new EOFException(sm.getString("iob.failedwrite"));
+ }
+ }
+ socketWriteBuffer.clear();
+ return result;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException(e);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (TimeoutException e) {
+ throw new SocketTimeoutException();
+ }
}
@Override
- public boolean flush(boolean block) throws IOException {
- if (getError() != null) {
- throw getError();
+ protected void flushBlocking() throws IOException {
+ // Before doing a blocking flush, make sure that any pending non
+ // blocking write has completed.
+ try {
+ if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) {
+ writePending.release();
+ } else {
+ throw new SocketTimeoutException();
+ }
+ } catch (InterruptedException e) {
+ // Ignore
}
- return super.flush(block);
- }
+ super.flushBlocking();
+ }
@Override
- protected boolean flush(boolean block, boolean hasPermit) throws IOException {
- if (getSocket() == null)
- return false;
+ protected boolean flushNonBlocking() {
+ return flushNonBlocking(false);
+ }
- if (block) {
- try {
- if (writePending.tryAcquire(getTimeout(), TimeUnit.MILLISECONDS)) {
- writePending.release();
- } else {
- // TODO
- }
- } catch (InterruptedException e) {
- // Ignore timeout
- }
- try {
- if (bufferedWrites.size() > 0) {
- for (ByteBufferHolder holder : bufferedWrites) {
- holder.flip();
- ByteBuffer buffer = holder.getBuf();
- while (buffer.hasRemaining()) {
- if (getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
- throw new EOFException(sm.getString("iob.failedwrite"));
- }
- }
- }
- bufferedWrites.clear();
- }
+ private boolean flushNonBlocking(boolean hasPermit) {
+ synchronized (writeCompletionHandler) {
+ if (hasPermit || writePending.tryAcquire()) {
if (!writeBufferFlipped) {
socketWriteBuffer.flip();
writeBufferFlipped = true;
}
- while (socketWriteBuffer.hasRemaining()) {
- if (getSocket().write(socketWriteBuffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
- throw new EOFException(sm.getString("iob.failedwrite"));
+ if (bufferedWrites.size() > 0) {
+ // Gathering write of the main buffer plus all leftovers
+ ArrayList<ByteBuffer> arrayList = new ArrayList<>();
+ if (socketWriteBuffer.hasRemaining()) {
+ arrayList.add(socketWriteBuffer);
+ }
+ for (ByteBufferHolder buffer : bufferedWrites) {
+ buffer.flip();
+ arrayList.add(buffer.getBuf());
}
- }
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
+ bufferedWrites.clear();
+ ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]);
+ getSocket().write(array, 0, array.length, getTimeout(),
+ TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler);
+ } else if (socketWriteBuffer.hasRemaining()) {
+ // Regular write
+ getSocket().write(socketWriteBuffer, getTimeout(),
+ TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler);
} else {
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (TimeoutException e) {
- throw new SocketTimeoutException();
- }
- socketWriteBuffer.clear();
- writeBufferFlipped = false;
- return false;
- } else {
- synchronized (writeCompletionHandler) {
- if (hasPermit || writePending.tryAcquire()) {
- if (!writeBufferFlipped) {
- socketWriteBuffer.flip();
- writeBufferFlipped = true;
- }
- if (bufferedWrites.size() > 0) {
- // Gathering write of the main buffer plus all leftovers
- ArrayList<ByteBuffer> arrayList = new ArrayList<>();
- if (socketWriteBuffer.hasRemaining()) {
- arrayList.add(socketWriteBuffer);
- }
- for (ByteBufferHolder buffer : bufferedWrites) {
- buffer.flip();
- arrayList.add(buffer.getBuf());
- }
- bufferedWrites.clear();
- ByteBuffer[] array = arrayList.toArray(new ByteBuffer[arrayList.size()]);
- getSocket().write(array, 0, array.length, getTimeout(),
- TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler);
- } else if (socketWriteBuffer.hasRemaining()) {
- // Regular write
- getSocket().write(socketWriteBuffer, getTimeout(),
- TimeUnit.MILLISECONDS, socketWriteBuffer, writeCompletionHandler);
- } else {
- // Nothing was written
- writePending.release();
- socketWriteBuffer.clear();
- writeBufferFlipped = false;
- }
+ // Nothing was written
+ writePending.release();
+ socketWriteBuffer.clear();
+ writeBufferFlipped = false;
}
- return hasDataToWrite();
}
+ return hasDataToWrite();
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Jan 12 08:59:42 2015
@@ -1472,7 +1472,10 @@ public class NioEndpoint extends Abstrac
@Override
public void close() throws IOException {
- getSocket().close();
+ NioChannel socket = getSocket();
+ if (socket != null) {
+ socket.close();
+ }
}
@@ -1509,10 +1512,10 @@ public class NioEndpoint extends Abstrac
@Override
- protected synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
+ protected synchronized int doWrite(boolean block, boolean flip)
throws IOException {
if (flip) {
- bytebuffer.flip();
+ socketWriteBuffer.flip();
writeBufferFlipped = true;
}
@@ -1525,7 +1528,7 @@ public class NioEndpoint extends Abstrac
// Ignore
}
try {
- written = pool.write(bytebuffer, getSocket(), selector, writeTimeout, block);
+ written = pool.write(socketWriteBuffer, getSocket(), selector, writeTimeout, block);
// Make sure we are flushed
do {
if (getSocket().flush(true, selector, writeTimeout)) break;
@@ -1535,8 +1538,8 @@ public class NioEndpoint extends Abstrac
pool.put(selector);
}
}
- if (bytebuffer.remaining() == 0) {
- bytebuffer.clear();
+ if (socketWriteBuffer.remaining() == 0) {
+ socketWriteBuffer.clear();
writeBufferFlipped = false;
}
// If there is data left in the buffer the socket will be registered for
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Mon Jan 12 08:59:42 2015
@@ -188,9 +188,9 @@ public abstract class SocketWrapperBase<
}
/**
- * Checks to see if there is any writes pending and if there is calls
+ * Checks to see if there are any writes pending and if there are calls
* {@link #registerWriteInterest()} to trigger a callback once the pending
- * write has completed.
+ * writes have completed.
* <p>
* Note: Once this method has returned <code>false</code> it <b>MUST NOT</b>
* be called again until the pending write has completed and the
@@ -202,13 +202,19 @@ public abstract class SocketWrapperBase<
* written otherwise <code>false</code>
*/
public boolean isReadyForWrite() {
- boolean result = !hasDataToWrite();
+ boolean result = canWrite();
if (!result) {
registerWriteInterest();
}
return result;
}
+
+ public boolean canWrite() {
+ return !writeBufferFlipped && socketWriteBuffer.hasRemaining() &&
+ bufferedWrites.size() == 0;
+ }
+
public void addDispatch(DispatchType dispatchType) {
synchronized (dispatches) {
dispatches.add(dispatchType);
@@ -286,79 +292,116 @@ public abstract class SocketWrapperBase<
public abstract void unRead(ByteBuffer input);
public abstract void close() throws IOException;
+
/**
* Writes the provided data to the socket, buffering any remaining data if
- * used in non-blocking mode. If any data remains in the buffers from a
- * previous write then that data will be written before this data. It is
- * therefore unnecessary to call flush() before calling this method.
+ * used in non-blocking mode.
*
* @param block <code>true<code> if a blocking write should be used,
* otherwise a non-blocking write will be used
- * @param b The byte array containing the data to be written
+ * @param buf The byte array containing the data to be written
* @param off The offset within the byte array of the data to be written
* @param len The length of the data to be written
*
* @throws IOException If an IO error occurs during the write
*/
- public void write(boolean block, byte[] b, int off, int len) throws IOException {
- // Always flush any data remaining in the buffers
- boolean dataLeft = flush(block, true);
-
- if (len == 0 || b == null) {
+ public void write(boolean block, byte[] buf, int off, int len) throws IOException {
+ if (len == 0 || buf == null || getSocket() == null) {
return;
}
- // Keep writing until all the data is written or a non-blocking write
- // leaves data in the buffer
- while (!dataLeft && len > 0) {
- int thisTime = transfer(b, off, len, socketWriteBuffer);
- len = len - thisTime;
- off = off + thisTime;
- int written = doWrite(socketWriteBuffer, block, true);
- if (written == 0) {
- dataLeft = true;
- } else {
- dataLeft = flush(block, true);
- }
+ // While the implementations for blocking and non-blocking writes are
+ // very similar they have been split into separate methods to allow
+ // sub-classes to override them individually. NIO2, for example,
+ // overrides the non-blocking write but not the blocking write.
+ if (block) {
+ writeBlocking(buf, off, len);
+ } else {
+ writeNonBlocking(buf, off, len);
}
- // Prevent timeouts for just doing client writes
+ // Prevent timeouts
access();
+ }
- if (!block && len > 0) {
- // Remaining data must be buffered
- addToBuffers(b, off, len);
+
+ /**
+ * Transfers the data to the socket write buffer (writing that data to the
+ * socket if the buffer fills up using a blocking write) until all the data
+ * has been transferred and space remains in the socket write buffer.
+ *
+ * @param buf The byte array containing the data to be written
+ * @param off The offset within the byte array of the data to be written
+ * @param len The length of the data to be written
+ *
+ * @throws IOException If an IO error occurs during the write
+ */
+ protected void writeBlocking(byte[] buf, int off, int len) throws IOException {
+ // Note: There is an implementation assumption that if the switch from
+ // non-blocking to blocking has been made then any pending
+ // non-blocking writes were flushed at the time the switch
+ // occurred.
+
+ // Keep writing until all the data has been transferred to the socket
+ // write buffer and space remains in that buffer
+ int thisTime = transfer(buf, off, len, socketWriteBuffer);
+ while (socketWriteBuffer.remaining() == 0) {
+ len = len - thisTime;
+ off = off + thisTime;
+ // TODO: There is an assumption here that the blocking write will
+ // block until all the data is written or the write times out.
+ // Document this assumption in the Javadoc for doWrite(),
+ // ensure it is valid for all implementations of doWrite() and
+ // then review all callers of doWrite() and review what
+ // simplifications this offers.
+ doWrite(true, true);
+ thisTime = transfer(buf, off, len, socketWriteBuffer);
}
}
/**
- * Writes as much data as possible from any that remains in the buffers.
+ * Transfers the data to the socket write buffer (writing that data to the
+ * socket if the buffer fills up using a non-blocking write) until either
+ * all the data has been transferred and space remains in the socket write
+ * buffer or a non-blocking write leaves data in the socket write buffer.
*
- * @param block <code>true<code> if a blocking write should be used,
- * otherwise a non-blocking write will be used
- *
- * @return <code>true</code> if data remains to be flushed after this method
- * completes, otherwise <code>false</code>. In blocking mode
- * therefore, the return value should always be <code>false</code>
+ * @param buf The byte array containing the data to be written
+ * @param off The offset within the byte array of the data to be written
+ * @param len The length of the data to be written
*
* @throws IOException If an IO error occurs during the write
*/
- public boolean flush(boolean block) throws IOException {
- return flush(block, false);
+ protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException {
+ if (!writeBufferFlipped) {
+ int thisTime = transfer(buf, off, len, socketWriteBuffer);
+ len = len - thisTime;
+ while (socketWriteBuffer.remaining() == 0) {
+ off = off + thisTime;
+ if (doWrite(false, !writeBufferFlipped) == 0) {
+ break;
+ }
+ if (writeBufferFlipped) {
+ thisTime = 0;
+ } else {
+ thisTime = transfer(buf, off, len, socketWriteBuffer);
+ }
+ len = len - thisTime;
+ }
+ }
+
+ if (len > 0) {
+ // Remaining data must be buffered
+ addToBuffers(buf, off, len);
+ }
}
/**
* Writes as much data as possible from any that remains in the buffers.
- * This method exists for those implementations (e.g. NIO2) that need
- * slightly different behaviour depending on if flush() was called directly
- * or by another method in this class or a sub-class.
- *
- * @param block <code>true<code> if a blocking write should be used,
- * otherwise a non-blocking write will be used
- * @param internal <code>true<code> if flush() was called by another method
- * in class or sub-class
+ *
+ * @param block <code>true<code> if a blocking write should be used,
+ * otherwise a non-blocking write will be used
*
* @return <code>true</code> if data remains to be flushed after this method
* completes, otherwise <code>false</code>. In blocking mode
@@ -366,16 +409,57 @@ public abstract class SocketWrapperBase<
*
* @throws IOException If an IO error occurs during the write
*/
- protected boolean flush(boolean block, boolean internal) throws IOException {
+ public boolean flush(boolean block) throws IOException {
+ if (getSocket() == null) {
+ return false;
+ }
- // Prevent timeout for async
+ if (getError() != null) {
+ throw getError();
+ }
+
+ boolean result = false;
+ if (block) {
+ // A blocking flush will always empty the buffer.
+ flushBlocking();
+ } else {
+ result = flushNonBlocking();
+ }
+
+ // Prevent timeouts
access();
+ return result;
+ }
+
+
+ protected void flushBlocking() throws IOException {
+ doWrite(true, !writeBufferFlipped);
+
+ if (bufferedWrites.size() > 0) {
+ Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
+ while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+ transfer(buffer.getBuf(), socketWriteBuffer);
+ if (buffer.getBuf().remaining() == 0) {
+ bufIter.remove();
+ }
+ doWrite(true, !writeBufferFlipped);
+ }
+ }
+ }
+
+ }
+
+
+ protected boolean flushNonBlocking() throws IOException {
boolean dataLeft = hasMoreDataToFlush();
// Write to the socket, if there is anything to write
if (dataLeft) {
- doWrite(socketWriteBuffer, block, !writeBufferFlipped);
+ doWrite(false, !writeBufferFlipped);
}
dataLeft = hasMoreDataToFlush();
@@ -385,13 +469,12 @@ public abstract class SocketWrapperBase<
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
buffer.flip();
- while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+ while (!hasMoreDataToFlush() && buffer.getBuf().remaining() > 0) {
transfer(buffer.getBuf(), socketWriteBuffer);
if (buffer.getBuf().remaining() == 0) {
bufIter.remove();
}
- doWrite(socketWriteBuffer, block, true);
- //here we must break if we didn't finish the write
+ doWrite(false, !writeBufferFlipped);
}
}
}
@@ -400,8 +483,7 @@ public abstract class SocketWrapperBase<
}
- protected abstract int doWrite(ByteBuffer buffer, boolean block, boolean flip)
- throws IOException;
+ protected abstract int doWrite(boolean block, boolean flip) throws IOException;
protected void addToBuffers(byte[] buf, int offset, int length) {
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Mon Jan 12 08:59:42 2015
@@ -88,7 +88,7 @@ public class WsRemoteEndpointImplServer
// was written
return;
}
- boolean complete = true;
+ boolean complete = false;
try {
// If this is false there will be a call back when it is true
while (sos.isReady()) {
@@ -103,6 +103,7 @@ public class WsRemoteEndpointImplServer
}
}
if (complete) {
+ sos.flush();
wsWriteTimeout.unregister(this);
clearHandler(null, useDispatch);
if (close) {
@@ -117,9 +118,9 @@ public class WsRemoteEndpointImplServer
clearHandler(ioe, useDispatch);
close();
}
+
if (!complete) {
// Async write is in progress
-
long timeout = getSendTimeout();
if (timeout > 0) {
// Register with timeout thread
Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Mon Jan 12 08:59:42 2015
@@ -221,19 +221,19 @@ public class TestNonBlockingAPI extends
boolean found = false;
for (int i = totalBodyRead; i < (totalBodyRead + line.length()); i++) {
if (DATA[i] != resultBytes[lineStart + i - totalBodyRead]) {
- int dataStart = i - 16;
+ int dataStart = i - 64;
if (dataStart < 0) {
dataStart = 0;
}
- int dataEnd = i + 16;
+ int dataEnd = i + 64;
if (dataEnd > DATA.length) {
dataEnd = DATA.length;
}
- int resultStart = lineStart + i - totalBodyRead - 16;
+ int resultStart = lineStart + i - totalBodyRead - 64;
if (resultStart < 0) {
resultStart = 0;
}
- int resultEnd = lineStart + i - totalBodyRead + 16;
+ int resultEnd = lineStart + i - totalBodyRead + 64;
if (resultEnd > resultString.length()) {
resultEnd = resultString.length();
}
@@ -492,25 +492,21 @@ public class TestNonBlockingAPI extends
@Override
public void onTimeout(AsyncEvent event) throws IOException {
log.info("onTimeout");
-
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
log.info("onStartAsync");
-
}
@Override
public void onError(AsyncEvent event) throws IOException {
log.info("AsyncListener.onError");
-
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
log.info("onComplete");
-
}
});
// step 2 - notify on read
Modified: tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java?rev=1651043&r1=1651042&r2=1651043&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java (original)
+++ tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Mon Jan 12 08:59:42 2015
@@ -47,7 +47,6 @@ import static org.apache.catalina.startu
import org.apache.catalina.Context;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
-import org.apache.catalina.util.IOTools;
public class TestUpgrade extends TomcatBaseTest {
@@ -234,8 +233,12 @@ public class TestUpgrade extends TomcatB
try (ServletInputStream sis = connection.getInputStream();
ServletOutputStream sos = connection.getOutputStream()){
-
- IOTools.flow(sis, sos);
+ byte[] buffer = new byte[8192];
+ int read;
+ while ((read = sis.read(buffer)) >= 0) {
+ sos.write(buffer, 0, read);
+ sos.flush();
+ }
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
@@ -274,7 +277,7 @@ public class TestUpgrade extends TomcatB
private class EchoReadListener extends NoOpReadListener {
- private byte[] buffer = new byte[8096];
+ private byte[] buffer = new byte[8192];
@Override
public void onDataAvailable() {
@@ -288,6 +291,7 @@ public class TestUpgrade extends TomcatB
throw new IOException("Unable to echo data. " +
"isReady() returned false");
}
+ sos.flush();
}
}
} catch (IOException ioe) {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org