You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/12/04 01:06:31 UTC
svn commit: r600737 - in /tomcat/sandbox/gdev6x: java/org/apache/catalina/
java/org/apache/catalina/connector/ java/org/apache/catalina/core/
java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/
webapps/docs/ webapps/docs/config/
Author: fhanik
Date: Mon Dec 3 16:06:24 2007
New Revision: 600737
URL: http://svn.apache.org/viewvc?rev=600737&view=rev
Log:
implemented buffered non blocking write for comet events
Modified:
tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java
tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java
tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java
tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java
tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java
tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java
tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/sandbox/gdev6x/webapps/docs/aio.xml
tomcat/sandbox/gdev6x/webapps/docs/changelog.xml
tomcat/sandbox/gdev6x/webapps/docs/config/http.xml
Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java Mon Dec 3 16:06:24 2007
@@ -131,20 +131,22 @@
* client a notice that the server has no more data to send as part of this
* request. The servlet should perform any needed cleanup as if it had recieved
* an END or ERROR event.
- * Invoking this method during a event, will cause the session to close
+ * Invoking this method during a event, will cause the Comet session to close
* immediately after the event method has finished.
- * Invoking this method asynchrously will not cause the session to close
- * until another event occurred, most likely a timeout.
- * If you wish to signal to the container
- * that the session should end sooner rather than later when this method is invoked
- * asycnhronously, then issue a
- * register(OP_CALLBACK) immediately after this method has been invoked.
+ * Invoking this method asynchrously will cause the Comet session to end after the
+ * END event has been processed
*
* @see #register(int)
*/
public void close() throws IOException;
/**
+ * Returns true if #close() has been invoked
+ * @return boolean
+ */
+ public boolean isClosed();
+
+ /**
* Sets the timeout for this Comet connection. Please NOTE, that the implementation
* of a per connection timeout is OPTIONAL and MAY NOT be implemented.<br/>
* This method sets the timeout in milliseconds of idle time on the connection.
@@ -171,11 +173,14 @@
* a) Blocking IO - standard servlet usage<br/>
* b) Register for READ events when data arrives<br/>
* Tomcat Comet allows you to configure for additional options:<br/>
- * the <code>configureBlocking(false)</code> bit signals whether writing and reading from the request
- * or writing to the response will be non blocking.<br/>
- * the <code>configureBlocking(true)</code> bit signals the container you wish for read and write to be done in a blocking fashion
- * @param blocking - true to make read and writes blocking
- * @throws IllegalStateException - if this method is invoked outside of the BEGIN event
+ * the <code>configureBlocking(false)</code> bit signals whether writing to the response will be non blocking.<br/>
+ * the <code>configureBlocking(true)</code> bit signals the container you wish for read and write to be done in a blocking fashion<br/>
+ * when parameter is set to false, writes will be buffered and dispatched to the servlet container
+ * to complete the write asynchronously. The size of the write buffer can be configured.
+ * If ServletRequest.(getInputStream/getWriter) is invoked with more data than
+ * it can handle, an IO Exception will be thrown.
+ * @param blocking - true to make writes blocking, false to make writes non blocking
+ * @throws IllegalStateException - if this method is invoked outside of the BEGIN event or if blocking has already been configured
* @see #isReadable()
* @see #isWriteable()
*/
@@ -188,10 +193,10 @@
public boolean isBlocking();
/**
- * OP_CALLBACK - receive a CALLBACK event from the container
+ * OP_CALLBACK - receive a CALLBACK event from the container, on a Tomcat worker thread
* OP_READ - receive a READ event when the connection has data to be read
* OP_WRITE - receive a WRITE event when the connection is able to receive data to be written
- * @see #register(int)
+ * @see #interestOps(int)
*/
public static class CometOperation {
//currently map these to the same values as org.apache.tomcat.util.net.PollerInterest
@@ -204,6 +209,7 @@
* Registers the Comet connection with the container for IO and event notifications.
* Each time this method is invoked, the operations are reset to the operations parameter value.
* To unregister an operation, simple do interestOps(interestOps() & (~CometOperation.OP_WRITE))
+ * This method can be invoked synchronously or asynchronously (by a non Tomcat worker thread) to change the operations
* @param operations
* @throws IllegalStateException - if you are trying to register with a socket that already is registered
* or if the operation you are trying to register is invalid.
@@ -243,5 +249,5 @@
* @return boolean
*/
public boolean isReadable();
-
+
}
Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java Mon Dec 3 16:06:24 2007
@@ -83,7 +83,13 @@
* Blocking or not blocking
*/
protected boolean blocking = true;
-
+
+ /**
+ * Closed?
+ */
+ protected boolean closed = false;
+
+
// --------------------------------------------------------- Public Methods
/**
@@ -94,6 +100,7 @@
response = null;
blocking = true;
cometOperations = 0;
+ closed = false;
}
public void setEventType(EventType eventType) {
@@ -104,12 +111,20 @@
this.eventSubType = eventSubType;
}
+ public boolean isClosed() {
+ return closed;
+ }
+
public void close() throws IOException {
+ if (!closed) closed = true;
if (request == null) {
throw new IllegalStateException(sm.getString("cometEvent.nullRequest"));
}
request.setComet(false);
response.finishResponse();
+ //if this is a worker thread, the comet operation will be reset
+ //otherwise, we are signaling to end the request
+ interestOps(CometEvent.CometOperation.OP_CALLBACK);
}
public EventSubType getEventSubType() {
Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java Mon Dec 3 16:06:24 2007
@@ -213,8 +213,8 @@
try {
SystemLogHandler.startCapture();
if (comet) {
- filterChain.doFilterEvent(request.getEvent());
request.setComet(true);
+ filterChain.doFilterEvent(request.getEvent());
} else {
filterChain.doFilter(request.getRequest(),
response.getResponse());
@@ -227,8 +227,8 @@
}
} else {
if (comet) {
- filterChain.doFilterEvent(request.getEvent());
request.setComet(true);
+ filterChain.doFilterEvent(request.getEvent());
} else {
filterChain.doFilter
(request.getRequest(), response.getResponse());
Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java Mon Dec 3 16:06:24 2007
@@ -692,6 +692,14 @@
public int getSocketBuffer() {
return socketBuffer;
}
+
+ public void setBufferedWriteSize(int size) {
+ outputBuffer.setBufferedWriteSize(size);
+ }
+
+ public int getBufferedWriteSize() {
+ return outputBuffer.getBufferedWriteSize();
+ }
/**
* Set the upload timeout.
@@ -744,20 +752,40 @@
public SocketState event(SocketStatus status)
throws IOException {
- RequestInfo rp = request.getRequestProcessor();
+ NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) socket.getAttachment(false);
+ if (status == SocketStatus.OPEN_WRITE) {
+ //flush out buffered write data
+ if (outputBuffer.hasDataToWrite()) {
+ int cnt = 0;
+ do {
+ cnt = outputBuffer.flushBuffer(false);
+ }while (cnt>0);
+ }
+ //return if we have more data to write
+ if (outputBuffer.hasDataToWrite()) return SocketState.LONG;
+
+ //return if the comet processor wasn't registered for WRITE
+ if (attach!=null && (attach.getCometOps()&PollerInterest.WRITE)!=PollerInterest.WRITE) {
+ return SocketState.LONG;
+ }
+ }
+ RequestInfo rp = request.getRequestProcessor();
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
error = !adapter.event(request, response, status);
if ( !error ) {
- NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- if (attach != null) {
- attach.setComet(comet);
- if (!comet) {
- //reset the timeout
- attach.setTimeout(endpoint.getSocketProperties().getSoTimeout());
- }
+ try {
+
+ if (attach != null) {
+ attach.setComet(comet);
+ if (!comet) {
+ //reset the timeout
+ attach.setTimeout(endpoint.getSocketProperties().getSoTimeout());
+ }
+ }
+ } catch (Exception ex) {
}
}
} catch (InterruptedIOException e) {
@@ -992,16 +1020,6 @@
localName = null;
remotePort = -1;
localPort = -1;
- //fix the synchronization scenario due to
- //dual comet flags.
- //while the response/request
- //might already be recycled, this circumvents the bug
- //and should not be an expensive operation
- //however, this is a TODO and FIXME
- //as it would be better coordinate the recycling of the request/response
- //instead
- response.recycle();
- request.recycle();
}
@@ -1242,10 +1260,11 @@
socket.getPoller().cometInterest(socket);
} else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE_BLOCKING) {
MutableBoolean bool = (MutableBoolean)param;
- if ( bool.get() ) throw new IllegalStateException("Not yet implemented");
RequestInfo rp = request.getRequestProcessor();
if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
throw new IllegalStateException("Can only be configured during an event.");
+ inputBuffer.setBlocking(bool.get());
+ outputBuffer.setBlocking(bool.get());
} else if (actionCode == ActionCode.ACTION_COMET_READABLE) {
MutableBoolean bool = (MutableBoolean)param;
try {
Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java Mon Dec 3 16:06:24 2007
@@ -39,6 +39,7 @@
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioEndpoint.Handler;
+import org.apache.tomcat.util.net.PollerInterest;
import org.apache.tomcat.util.net.SSLImplementation;
import org.apache.tomcat.util.net.SecureNioChannel;
import org.apache.tomcat.util.net.SocketStatus;
@@ -219,7 +220,7 @@
private int socketCloseDelay=-1;
private boolean disableUploadTimeout = true;
private int socketBuffer = 9000;
-
+ private int bufferedWriteSize = 64*1024;
private Adapter adapter;
private Http11ConnectionHandler cHandler;
@@ -533,6 +534,10 @@
this.processorCache = processorCache;
}
+ public void setBufferedWriteSize(int bufferedWriteSize) {
+ this.bufferedWriteSize = bufferedWriteSize;
+ }
+
public void setOomParachute(int oomParachute) {
ep.setOomParachute(oomParachute);
setAttribute("oomParachute",oomParachute);
@@ -682,7 +687,9 @@
if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
//add correct poller events here based on Comet stuff
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- socket.getPoller().add(socket,att.getCometOps());
+ int ops = att.getCometOps();
+ if (result.outputBuffer.hasDataToWrite()) ops = ops|PollerInterest.WRITE;
+ socket.getPoller().add(socket,ops);
}
}
}
@@ -725,7 +732,9 @@
connections.put(socket, processor);
if (processor.comet) {
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- socket.getPoller().add(socket,att.getCometOps());
+ int ops = att.getCometOps();
+ if (processor.outputBuffer.hasDataToWrite()) ops = ops|PollerInterest.WRITE;
+ socket.getPoller().add(socket,ops);
} else {
socket.getPoller().add(socket);
}
@@ -775,6 +784,7 @@
processor.setCompressableMimeTypes(proto.compressableMimeTypes);
processor.setRestrictedUserAgents(proto.restrictedUserAgents);
processor.setSocketBuffer(proto.socketBuffer);
+ processor.setBufferedWriteSize(proto.bufferedWriteSize);
processor.setMaxSavePostSize(proto.maxSavePostSize);
processor.setServer(proto.server);
register(processor);
@@ -842,6 +852,10 @@
public int getProcessorCache() {
return processorCache;
+ }
+
+ public int getBufferedWriteSize() {
+ return bufferedWriteSize;
}
public int getOomParachute() {
Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java Mon Dec 3 16:06:24 2007
@@ -28,9 +28,9 @@
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.res.StringManager;
-import org.apache.tomcat.util.net.NioEndpoint;
/**
* Implementation of InputBuffer which provides HTTP request header parsing as
@@ -188,7 +188,10 @@
*/
protected int lastActiveFilter;
-
+ /**
+ * Flag used only for Comet requests/responses
+ */
+ protected boolean blocking = true;
// ------------------------------------------------------------- Properties
@@ -206,6 +209,10 @@
return socket;
}
+ public boolean isBlocking() {
+ return blocking;
+ }
+
public void setSelectorPool(NioSelectorPool pool) {
this.pool = pool;
}
@@ -283,6 +290,10 @@
this.swallowInput = swallowInput;
}
+ public void setBlocking(boolean blocking) {
+ this.blocking = blocking;
+ }
+
// --------------------------------------------------------- Public Methods
/**
* Returns true if there are bytes available from the socket layer
@@ -328,7 +339,7 @@
parsingRequestLineQPos = -1;
headerData.recycle();
swallowInput = true;
-
+ blocking = true;
}
@@ -373,7 +384,7 @@
parsingRequestLineQPos = -1;
headerData.recycle();
swallowInput = true;
-
+ blocking = true;
}
@@ -889,7 +900,10 @@
throws IOException {
if (pos >= lastValid) {
- if (!fill(true,true)) //read body, must be blocking, as the thread is inside the app
+ //since the filters are not stateful
+ //we can't issue non blocking reads.
+ //It simply doesn't work.
+ if (!fill(true,true))
return -1;
}
Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Mon Dec 3 16:06:24 2007
@@ -25,6 +25,7 @@
import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
+import org.apache.tomcat.util.MutableInteger;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.CharChunk;
import org.apache.tomcat.util.buf.MessageBytes;
@@ -34,8 +35,7 @@
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.res.StringManager;
-import java.io.EOFException;
-import org.apache.tomcat.util.MutableInteger;
+import java.nio.BufferOverflowException;
/**
* Output buffer.
@@ -77,8 +77,6 @@
} else {
bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
}
- //bbuf = ByteBuffer.allocateDirect(bbufLimit);
-
outputStreamOutputBuffer = new SocketOutputBuffer();
filterLibrary = new OutputFilter[0];
@@ -183,6 +181,31 @@
*/
protected int lastActiveFilter;
+ /**
+ * Flag used only for Comet requests/responses
+ */
+ protected boolean blocking = true;
+
+ /**
+ * Track if the byte buffer is flipped
+ */
+ protected boolean flipped = false;
+
+ /**
+ * For "non-blocking" writes use an external buffer
+ */
+ protected ByteBuffer bufferedWrite = null;
+
+ /**
+ * The max size of the buffered write buffer
+ */
+ protected int bufferedWriteSize = 64*1024; //64k default write buffer
+
+ /**
+ * track if buffered buffer is flipped
+ */
+ protected boolean bufflipped = false;
+
// ------------------------------------------------------------- Properties
@@ -193,6 +216,19 @@
this.socket = socket;
}
+ public void setBlocking(boolean blocking) {
+ this.blocking = blocking;
+ bufflipped = false;
+ if (blocking)
+ bufferedWrite = null;
+ else
+ bufferedWrite = ByteBuffer.allocate(bufferedWriteSize);
+ }
+
+ public void setBufferedWriteSize(int bufferedWriteSize) {
+ this.bufferedWriteSize = bufferedWriteSize;
+ }
+
/**
* Get the underlying socket input stream.
*/
@@ -200,6 +236,36 @@
return socket;
}
+ public boolean isBlocking() {
+ return blocking;
+ }
+
+ public ByteBuffer getBufferedWrite() {
+ return bufferedWrite;
+ }
+
+ public boolean hasBufferedData() {
+ if (getBufferedWrite()!=null) {
+ if (bufflipped) return getBufferedWrite().hasRemaining();
+ else return getBufferedWrite().position()>0;
+ }else {
+ return false;
+ }
+ }
+
+ public boolean hasDataToWrite() {
+ if (!hasBufferedData()) {
+ if (flipped) return socket.getBufHandler().getWriteBuffer().hasRemaining();
+ else return socket.getBufHandler().getWriteBuffer().position()>0;
+ }else {
+ return true;
+ }
+ }
+
+ public int getBufferedWriteSize() {
+ return bufferedWriteSize;
+ }
+
public void setSelectorPool(NioSelectorPool pool) {
this.pool = pool;
}
@@ -286,19 +352,14 @@
*/
public void flush()
throws IOException {
-
if (!committed) {
// Send the connector a request for commit. The connector should
// then validate the headers, send them (using sendHeader) and
// set the filters accordingly.
response.action(ActionCode.ACTION_COMMIT, null);
-
}
-
- // Flush the current buffer
- flushBuffer();
-
+ flushBuffer(isBlocking());
}
@@ -322,6 +383,10 @@
* connection.
*/
public void recycle() {
+ recycle(true);
+ }
+ public void recycle(boolean clearbuf) {
+
// Recycle filters
for (int i = 0; i <= lastActiveFilter; i++) {
activeFilters[i].recycle();
@@ -329,7 +394,7 @@
// Recycle Request object
response.recycle();
- socket.getBufHandler().getWriteBuffer().clear();
+ if (clearbuf && socket!=null) socket.getBufHandler().getWriteBuffer().clear();
socket = null;
pos = 0;
@@ -337,7 +402,8 @@
committed = false;
finished = false;
lastWrite.set(1);
-
+ setBlocking(true);
+ flipped = false;
}
@@ -348,21 +414,7 @@
* to parse the next HTTP request.
*/
public void nextRequest() {
-
- // Recycle Request object
- response.recycle();
-
- // Recycle filters
- for (int i = 0; i <= lastActiveFilter; i++) {
- activeFilters[i].recycle();
- }
-
- // Reset pointers
- pos = 0;
- lastActiveFilter = -1;
- committed = false;
- finished = false;
-
+ recycle(false);//proper pipeline support?
}
@@ -389,14 +441,18 @@
if (lastActiveFilter != -1)
activeFilters[lastActiveFilter].end();
- flushBuffer();
+ flushBuffer(true); //dont return upon call of close()
finished = true;
}
public boolean isWritable() {
- return lastWrite.get()>0;
+ if (lastWrite.get()>0) {
+ return !hasDataToWrite();
+ }else {
+ return false;
+ }
}
// ------------------------------------------------ HTTP/1.1 Output Methods
@@ -408,9 +464,8 @@
throws IOException {
if (!committed) {
- //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
socket.getBufHandler() .getWriteBuffer().put(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);
- writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true);
+ writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true);//ack is always blocking
}
}
@@ -424,8 +479,10 @@
* @todo Fix non blocking write properly
*/
private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
- if ( flip ) bytebuffer.flip();
-
+ if ( flip ) {
+ bytebuffer.flip();
+ flipped = true;
+ }
int written = 0;
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
if ( att == null ) throw new IOException("Key must be cancelled");
@@ -440,12 +497,18 @@
written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite);
//make sure we are flushed
do {
- if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
+ //force writing of the net buffer on SSL
+ if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
}while ( true );
}finally {
if ( selector != null ) getSelectorPool().put(selector);
}
- if ( block ) bytebuffer.clear(); //only clear
+ if ( block || bytebuffer.remaining()==0) {
+ //blocking writes must empty the buffer
+ //and if remaining==0 then we did empty it
+ bytebuffer.clear();
+ flipped = false;
+ }
this.total = 0;
return written;
}
@@ -612,13 +675,20 @@
int total = 0;
private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
- while (socket.getBufHandler().getWriteBuffer().remaining() < length) {
- flushBuffer();
+ if (isBlocking()) {
+ while (socket.getBufHandler().getWriteBuffer().remaining() < length) {
+ flushBuffer(true);
+ }
+ socket.getBufHandler().getWriteBuffer().put(buf, offset, length);
+ total += length;
+ NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment) socket.getAttachment(false);
+ if (ka != null)
+ ka.access(); //prevent timeouts for just doing client writes
+ } else {
+ if (bufferedWrite.remaining()<length) throw new IOException("BufferOverflowException:Unable to fit buffered write data in buffer.");
+ if (bufflipped) throw new IOException("Invalid write attempt, previous buffered write not completed.");
+ bufferedWrite.put(buf, offset, length);
}
- socket.getBufHandler().getWriteBuffer().put(buf, offset, length);
- total += length;
- NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- if ( ka!= null ) ka.access();//prevent timeouts for just doing client writes
}
@@ -753,25 +823,58 @@
/**
* Callback to write data from the buffer.
+ * @return the number of bytes written
*/
- protected void flushBuffer()
+ protected int flushBuffer(boolean block)
throws IOException {
-
+ int result = 0;
//prevent timeout for async,
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
attach.access();
}
-
+
//write to the socket, if there is anything to write
- if (socket.getBufHandler().getWriteBuffer().position() > 0) {
- socket.getBufHandler().getWriteBuffer().flip();
- writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false);
+ if ((flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) ||
+ (!flipped && socket.getBufHandler().getWriteBuffer().position() > 0) ) {
+ result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
+ } else if (bufferedWrite!=null) {
+ if ((bufflipped && bufferedWrite.remaining()>0)||(!bufflipped && bufferedWrite.position()>0)) {
+ //transfer to the socket buffer
+ if (!bufflipped) {
+ bufferedWrite.flip();
+ bufflipped = true;
+ }
+ transfer(bufferedWrite, socket.getBufHandler().getWriteBuffer());
+ if (bufferedWrite.remaining() == 0) {
+ bufferedWrite.clear();
+ bufflipped = false;
+ }
+ result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
+ }
+ }
+ return result;
+ }
+
+ protected int transfer(ByteBuffer from, ByteBuffer to) {
+ int remaining = from.remaining();
+ int toRemaining = to.remaining();
+ if (toRemaining >= remaining) {
+ to.put(from);
+ return remaining;
+ } else {
+ int limit = from.limit();
+ int position = from.position();
+ from.limit(position + toRemaining);
+ to.put(from);
+ from.limit(limit);
+ return toRemaining;
}
}
+
// ----------------------------------- OutputStreamOutputBuffer Inner Class
@@ -792,19 +895,24 @@
int len = chunk.getLength();
int start = chunk.getStart();
byte[] b = chunk.getBuffer();
- while (len > 0) {
- int thisTime = len;
- if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) {
- flushBuffer();
- }
- if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) {
- thisTime = socket.getBufHandler().getWriteBuffer().remaining();
+ if (isBlocking()) {
+ while (len > 0) {
+ int thisTime = len;
+ if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) {
+ flushBuffer(true);
+ }
+ if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) {
+ thisTime = socket.getBufHandler().getWriteBuffer().remaining();
+ }
+ addToBB(b,start,thisTime);
+ len = len - thisTime;
+ start = start + thisTime;
}
- addToBB(b,start,thisTime);
- len = len - thisTime;
- start = start + thisTime;
+ return chunk.getLength();
+ }else {
+ addToBB(b,start,len);
+ return len;
}
- return chunk.getLength();
}
Modified: tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 3 16:06:24 2007
@@ -1419,8 +1419,12 @@
//the comet event takes care of clean up
//processSocket(ka.getChannel(), status, dispatch);
ka.setComet(false);//to avoid a loop
- processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key
- if (status == SocketStatus.TIMEOUT ) return; // don't close on comet timeout
+ if (status == SocketStatus.TIMEOUT ) {
+ processSocket(ka.getChannel(), status, true);
+ return; // don't close on comet timeout
+ } else {
+ processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key
+ }
}
handler.release(ka.getChannel());
if (key.isValid()) key.cancel();
@@ -1545,12 +1549,12 @@
//set interest ops to 0 so we don't get multiple
//invokations for both read and write on separate threads
reg(sk, attachment, 0);
- //read goes before write
- if (sk.isReadable()) {
- if (!processSocket(channel, SocketStatus.OPEN_READ))
+ //write goes before write
+ if (sk.isWritable()) {
+ if (!processSocket(channel, SocketStatus.OPEN_WRITE))
processSocket(channel, SocketStatus.DISCONNECT);
} else {
- if (!processSocket(channel, SocketStatus.OPEN_WRITE))
+ if (!processSocket(channel, SocketStatus.OPEN_READ))
processSocket(channel, SocketStatus.DISCONNECT);
}
} else {
Modified: tomcat/sandbox/gdev6x/webapps/docs/aio.xml
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/aio.xml?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/webapps/docs/aio.xml (original)
+++ tomcat/sandbox/gdev6x/webapps/docs/aio.xml Mon Dec 3 16:06:24 2007
@@ -865,6 +865,16 @@
</ul>
</section>
+ <section name="Non blocking buffered writes">
+ <p>
+ Another feature of Comet is that you can enable buffered non blocking writes.
+ This is not non blocking as in the NIO sense, instead tomcat will buffer your response
+ and when you invoke response.flushBuffer() it will try to flush the buffer to the socket in
+ a non blocking fashion. If the buffer wrote out completely, CometEvent.isWriteable() will return true,
+ if there is more in the buffer to write, CometEvent.isWriteable() will return false, and you should not attempt
+ further writes, instead register for CometEvent.CometOperation.OP_WRITE to be notified when you can write again.
+ </p>
+ </section>
</body>
</document>
Modified: tomcat/sandbox/gdev6x/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/changelog.xml?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/webapps/docs/changelog.xml (original)
+++ tomcat/sandbox/gdev6x/webapps/docs/changelog.xml Mon Dec 3 16:06:24 2007
@@ -17,6 +17,11 @@
<body>
<section name="Tomcat g6.xdev(unknown)">
<subsection name="Catalina">
+ <update>
+ Implement buffered write
+ This means that a Comet servlet can write to the buffer without blocking.
+ Writing actually happens upon flushBuffer in a non blocking fashion
+ </update>
<fix><bug>43653</bug>
Fix for SSL buffer mixup
</fix>
Modified: tomcat/sandbox/gdev6x/webapps/docs/config/http.xml
URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/config/http.xml?rev=600737&r1=600736&r2=600737&view=diff
==============================================================================
--- tomcat/sandbox/gdev6x/webapps/docs/config/http.xml (original)
+++ tomcat/sandbox/gdev6x/webapps/docs/config/http.xml Mon Dec 3 16:06:24 2007
@@ -556,6 +556,11 @@
If you have an OOM outside of the Java Heap, then this parachute trick will not help.
</p>
</attribute>
+ <attribute name="bufferedWriteSize" required="false">
+ <p>(int) The size in bytes that should be used when Comet servlets used buffered/non blocking write logic.
+ The default is <code>64kb</code> or <code>64*1024 bytes</code>.
+ </p>
+ </attribute>
</attributes>
</subsection>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org