You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2012/07/06 08:53:53 UTC
svn commit: r1358055 - in /tomcat/trunk: java/org/apache/catalina/connector/
java/org/apache/catalina/core/ java/org/apache/coyote/
java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/
java/org/apache/coyote/spdy/ java/org/apache/tomcat/util/net/...
Author: fhanik
Date: Fri Jul 6 06:53:52 2012
New Revision: 1358055
URL: http://svn.apache.org/viewvc?rev=1358055&view=rev
Log:
implement rev 1 of async/non blocking writes
Modified:
tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
tomcat/trunk/java/org/apache/catalina/connector/Request.java
tomcat/trunk/java/org/apache/catalina/connector/Response.java
tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
tomcat/trunk/java/org/apache/coyote/ActionCode.java
tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
tomcat/trunk/java/org/apache/coyote/Response.java
tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteAdapter.java Fri Jul 6 06:53:52 2012
@@ -296,18 +296,15 @@ public class CoyoteAdapter implements Ad
}
- if (!request.isAsyncDispatching() && request.isAsync()) {
- AtomicBoolean result = new AtomicBoolean(true);
- req.action(ActionCode.ASYNC_DISPATCH_FOR_OPERATION, this);
- if (result.get()) {
- if (status==SocketStatus.OPEN_WRITE) {
- //TODO Notify write listener
- } else if (status==SocketStatus.OPEN_READ) {
- //TODO Notify read listener
- asyncConImpl.canRead();
- }
- success = true;
+ if (!request.isAsyncDispatching() && request.isAsync() && request.isAsyncOperation()) {
+ if (status == SocketStatus.OPEN_WRITE) {
+ // TODO Notify write listener
+ success = asyncConImpl.canWrite();
+ } else if (status == SocketStatus.OPEN_READ) {
+ // TODO Notify read listener
+ success = asyncConImpl.canRead();
}
+
}
if (request.isAsyncDispatching()) {
Modified: tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/CoyoteOutputStream.java Fri Jul 6 06:53:52 2012
@@ -109,23 +109,15 @@ public class CoyoteOutputStream
ob.close();
}
- /**
- * TODO SERVLET 3.1
- */
@Override
public boolean canWrite() {
- // TODO Auto-generated method stub
- return false;
+ return ob.canWrite();
}
- /**
- * TODO SERVLET 3.1
- */
@Override
public void setWriteListener(WriteListener listener) {
- // TODO Auto-generated method stub
-
+ ob.setWriteListener(listener);
}
Modified: tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/OutputBuffer.java Fri Jul 6 06:53:52 2012
@@ -23,6 +23,9 @@ import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.servlet.WriteListener;
import org.apache.catalina.Globals;
import org.apache.coyote.ActionCode;
@@ -607,4 +610,25 @@ public class OutputBuffer extends Writer
}
+ public boolean canWrite() {
+ if (getWriteListener()==null) throw new IllegalStateException("not in non blocking mode.");
+ AtomicBoolean canWrite = new AtomicBoolean(true);
+ coyoteResponse.action(ActionCode.NB_WRITE_INTEREST, canWrite);
+ return canWrite.get();
+}
+
+
+
+ private volatile WriteListener listener;
+ public void setWriteListener(WriteListener listener) {
+ this.listener = listener;
+ coyoteResponse.action(ActionCode.SET_WRITE_LISTENER, listener);
+ }
+
+ public WriteListener getWriteListener() {
+ return listener;
+ }
+
+
+
}
Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Fri Jul 6 06:53:52 2012
@@ -1653,6 +1653,16 @@ public class Request
return result.get();
}
+ public boolean isAsyncOperation() {
+ if (asyncContext == null) {
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+ coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC_OPERATION, result);
+ return result.get();
+ }
+
@Override
public boolean isAsyncSupported() {
if (this.asyncSupported == null) {
Modified: tomcat/trunk/java/org/apache/catalina/connector/Response.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Response.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Response.java Fri Jul 6 06:53:52 2012
@@ -153,6 +153,10 @@ public class Response
outputBuffer.setResponse(coyoteResponse);
}
+ public org.apache.coyote.Response getCoyoteResponse() {
+ return this.coyoteResponse;
+ }
+
/**
* Return the Context within which this Request is being processed.
Modified: tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java (original)
+++ tomcat/trunk/java/org/apache/catalina/core/AsyncContextImpl.java Fri Jul 6 06:53:52 2012
@@ -106,6 +106,8 @@ public class AsyncContextImpl implements
}
public boolean canRead() throws IOException {
+ if (request.getCoyoteRequest().getReadListener()==null) return false;
+
ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
ClassLoader newCL = request.getContext().getLoader().getClassLoader();
try {
@@ -121,7 +123,16 @@ public class AsyncContextImpl implements
}
public boolean canWrite() throws IOException {
- return false;
+ if (request.getResponse().getCoyoteResponse().getWriteListener()==null) return false;
+ ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
+ ClassLoader newCL = request.getContext().getLoader().getClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(newCL);
+ request.getResponse().getCoyoteResponse().getWriteListener().onWritePossible();
+ }finally {
+ Thread.currentThread().setContextClassLoader(oldCL);
+ }
+ return true;
}
public boolean timeout() throws IOException {
Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Jul 6 06:53:52 2012
@@ -183,6 +183,11 @@ public enum ActionCode {
ASYNC_IS_ASYNC,
/**
+ * Callback to determine if async read/write is in progress
+ */
+ ASYNC_IS_ASYNC_OPERATION,
+
+ /**
* Callback to determine if async dispatch is in progress
*/
ASYNC_IS_STARTED,
Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul 6 06:53:52 2012
@@ -149,6 +149,10 @@ public class AsyncStateMachine<S> {
return state.isAsync();
}
+ public boolean isAsyncOperation() {
+ return state == AsyncState.READ_WRITE_OP;
+ }
+
public boolean isAsyncDispatching() {
return state.isDispatching();
}
Modified: tomcat/trunk/java/org/apache/coyote/Response.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Response.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Response.java Fri Jul 6 06:53:52 2012
@@ -21,6 +21,11 @@ import java.io.IOException;
import java.io.StringReader;
import java.util.Locale;
+import javax.servlet.ReadListener;
+import javax.servlet.WriteListener;
+
+import org.apache.coyote.http11.AbstractInputBuffer;
+import org.apache.coyote.http11.AbstractOutputBuffer;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.http.parser.AstMediaType;
@@ -541,4 +546,29 @@ public final class Response {
}
return outputBuffer.getBytesWritten();
}
+
+ protected volatile WriteListener listener;
+
+ public WriteListener getWriteListener() {
+ return listener;
+}
+
+ public void setWriteListener(WriteListener listener) {
+ //TODO SERVLET 3.1 is it allowed to switch from non block to blocking?
+ setBlocking(listener==null);
+ this.listener = listener;
+ }
+
+ protected volatile boolean blocking = true;
+
+ public boolean isBlocking() {
+ return blocking;
+ }
+
+ public void setBlocking(boolean blocking) throws IllegalStateException {
+ @SuppressWarnings("rawtypes")
+ AbstractOutputBuffer buf = (AbstractOutputBuffer)outputBuffer;
+ if (!blocking && !buf.supportsNonBlocking()) throw new IllegalStateException();
+ this.blocking = blocking;
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Fri Jul 6 06:53:52 2012
@@ -455,6 +455,8 @@ public abstract class AbstractAjpProcess
((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
} else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+ } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsyncOperation());
} else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
} else if (actionCode == ActionCode.UPGRADE) {
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Fri Jul 6 06:53:52 2012
@@ -252,8 +252,7 @@ public class AjpNioProcessor extends Abs
if (actionCode == ActionCode.ASYNC_COMPLETE) {
if (asyncStateMachine.asyncComplete()) {
- ((NioEndpoint)endpoint).processSocket(this.socket,
- SocketStatus.OPEN_READ, false);
+ ((NioEndpoint)endpoint).dispatchForEvent(socket, SocketStatus.OPEN_READ, false);
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
if (param == null) return;
@@ -264,11 +263,9 @@ public class AjpNioProcessor extends Abs
}
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
if (asyncStateMachine.asyncDispatch()) {
- ((NioEndpoint)endpoint).processSocket(this.socket,
- SocketStatus.OPEN_READ, true);
+ ((NioEndpoint)endpoint).dispatchForEvent(socket, SocketStatus.OPEN_READ, true); }
}
}
- }
// ------------------------------------------------------ Protected Methods
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Fri Jul 6 06:53:52 2012
@@ -832,6 +832,8 @@ public abstract class AbstractHttp11Proc
((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
} else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+ } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsyncOperation());
} else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
} else if (actionCode == ActionCode.UPGRADE) {
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Fri Jul 6 06:53:52 2012
@@ -546,4 +546,9 @@ public abstract class AbstractOutputBuff
}
}
+ // --------------------------------------------------------- Public Methods
+
+
+ public abstract boolean supportsNonBlocking();
+
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Fri Jul 6 06:53:52 2012
@@ -20,9 +20,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
import javax.servlet.ReadListener;
+import javax.servlet.WriteListener;
import org.apache.coyote.ActionCode;
import org.apache.coyote.RequestInfo;
@@ -102,19 +104,7 @@ public class Http11NioProcessor extends
*/
protected SocketWrapper<NioChannel> socket = null;
- /**
- * TODO SERVLET 3.1
- */
- protected ReadListener readListener;
-
- public ReadListener getReadListener() {
- return readListener;
- }
-
- public void setReadListener(ReadListener listener) {
- readListener = listener;
- }
-
+ protected volatile boolean wantOnWritePossible = false;
// --------------------------------------------------------- Public Methods
@@ -185,6 +175,84 @@ public class Http11NioProcessor extends
}
+
+
+ @Override
+ public SocketState asyncDispatch(SocketStatus status) {
+ final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
+
+
+ if (status == SocketStatus.OPEN_WRITE) {
+ try {
+ asyncStateMachine.asyncOperation();
+ try {
+ if (outputBuffer.hasDataToWrite()) {
+ //System.out.println("Attempting data flush!!");
+ outputBuffer.flushBuffer(false);
+ }
+ }catch (IOException x) {
+ if (log.isDebugEnabled()) log.debug("Unable to write async data.",x);
+ //TODO FIXME-- fix - so we can notify of error
+ return SocketState.CLOSED;
+ }
+ //return if we have more data to write
+ if (isRegisteredForWrite(attach)) {
+ return SocketState.LONG;
+ }
+ }catch (IllegalStateException x) {
+ }
+ } else if (status == SocketStatus.OPEN_READ) {
+ try {
+ try {
+ if (inputBuffer.nbRead()>0) {
+ asyncStateMachine.asyncOperation();
+ }
+ }catch (IOException x) {
+ if (log.isDebugEnabled()) log.debug("Unable to read async data.",x);
+ //TODO FIXME-- fix - so we can notify of error
+ return SocketState.CLOSED;
+ }
+ //return if we have more data to write
+ }catch (IllegalStateException x) {
+ }
+ }
+
+ SocketState state = super.asyncDispatch(status);
+ //return if we have more data to write
+ if (isRegisteredForWrite(attach)) {
+ return SocketState.LONG;
+ } else {
+ return state;
+ }
+ }
+
+
+
+ @Override
+ public SocketState process(SocketWrapper<NioChannel> socketWrapper) throws IOException {
+ SocketState state = super.process(socketWrapper);
+ final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
+ //return if we have more data to write
+ if (isRegisteredForWrite(attach)) {
+ return SocketState.LONG;
+ } else {
+ return state;
+ }
+ }
+
+
+
+
+ protected boolean isRegisteredForWrite(KeyAttachment attach) {
+ //return if we have more data to write
+ if (outputBuffer.hasDataToWrite()) {
+ attach.interestOps(SelectionKey.OP_WRITE);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@Override
protected void resetTimeouts() {
final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
@@ -305,11 +373,14 @@ public class Http11NioProcessor extends
}
+
+
@Override
public void recycleInternal() {
socket = null;
comet = false;
sendfileData = null;
+ wantOnWritePossible = false;
}
@@ -492,8 +563,7 @@ public class Http11NioProcessor extends
}
} else if (actionCode == ActionCode.ASYNC_COMPLETE) {
if (asyncStateMachine.asyncComplete()) {
- ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
- SocketStatus.OPEN_READ, true);
+ ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketStatus.OPEN_READ, true);
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
if (param==null) {
@@ -508,14 +578,15 @@ public class Http11NioProcessor extends
attach.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
if (asyncStateMachine.asyncDispatch()) {
- ((NioEndpoint)endpoint).processSocket(this.socket.getSocket(),
- SocketStatus.OPEN_READ, true);
+ ((NioEndpoint)endpoint).dispatchForEvent(this.socket.getSocket(),SocketStatus.OPEN_READ, true);
}
- } else if (actionCode == ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
- asyncStateMachine.asyncOperation();
} else if (actionCode == ActionCode.SET_READ_LISTENER) {
ReadListener listener = (ReadListener)param;
request.setReadListener(listener);
+ } else if (actionCode == ActionCode.SET_WRITE_LISTENER) {
+ WriteListener listener = (WriteListener)param;
+ response.setWriteListener(listener);
+ outputBuffer.setBlocking(listener==null);
} else if (actionCode == ActionCode.NB_READ_INTEREST) {
if (socket==null || socket.getSocket().getAttachment(false)==null) {
return;
@@ -524,8 +595,27 @@ public class Http11NioProcessor extends
if (rp.getStage() == org.apache.coyote.Constants.STAGE_SERVICE) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getSocket().getAttachment(false);
attach.interestOps(attach.interestOps() | SelectionKey.OP_READ);
+ } else {
+ throw new IllegalStateException("Calling isReady asynchronously is illegal.");
}
-
+ } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
+ if (socket==null || socket.getSocket().getAttachment(false)==null) {
+ return;
+ }
+ AtomicBoolean canWrite = (AtomicBoolean)param;
+ RequestInfo rp = request.getRequestProcessor();
+ if (rp.getStage() == org.apache.coyote.Constants.STAGE_SERVICE) {
+ if (outputBuffer.isWritable()) {
+ canWrite.set(true);
+ } else {
+ canWrite.set(false);
+ wantOnWritePossible = true;
+ }
+ } else {
+ throw new IllegalStateException("Calling canWrite asynchronously is illegal.");
+ }
+ } else if (actionCode == ActionCode.ASYNC_DISPATCH_FOR_OPERATION) {
+ asyncStateMachine.asyncOperation();
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Fri Jul 6 06:53:52 2012
@@ -86,6 +86,11 @@ public class InternalAprOutputBuffer ext
// --------------------------------------------------------- Public Methods
@Override
+ public boolean supportsNonBlocking() {
+ return false;
+ }
+
+ @Override
public void init(SocketWrapper<Long> socketWrapper,
AbstractEndpoint endpoint) throws IOException {
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Fri Jul 6 06:53:52 2012
@@ -21,6 +21,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
@@ -76,9 +82,80 @@ public class InternalNioOutputBuffer ext
*/
private NioSelectorPool pool;
+ /**
+ * Flag used only for Comet requests/responses
+ */
+ protected volatile boolean blocking = true;
+
+ /**
+ * Track if the byte buffer is flipped
+ */
+ protected volatile boolean flipped = false;
+
+ /**
+ * For "non-blocking" writes use an external buffer
+ */
+ protected volatile LinkedBlockingDeque<ByteBufferHolder> bufferedWrite = null;
+
+ /**
+ * The max size of the buffered write buffer
+ */
+ protected int bufferedWriteSize = 64*1024; //64k default write buffer
+
+ /**
+ * Number of bytes last written
+ */
+ protected AtomicInteger lastWrite = new AtomicInteger(1);
+
+ protected class ByteBufferHolder {
+ private ByteBuffer buf;
+ private AtomicBoolean flipped;
+ public ByteBufferHolder(ByteBuffer buf, boolean flipped) {
+ this.buf = buf;
+ this.flipped = new AtomicBoolean(flipped);
+ }
+ public ByteBuffer getBuf() {
+ return buf;
+ }
+ public boolean isFlipped() {
+ return flipped.get();
+ }
+
+ public boolean flip() {
+ if (flipped.compareAndSet(false, true)) {
+ buf.flip();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean hasData() {
+ if (flipped.get()) {
+ return buf.remaining()>0;
+ } else {
+ return buf.position()>0;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder(super.toString());
+ builder.append("[flipped=");
+ builder.append(isFlipped()?"true, remaining=" : "false, position=");
+ builder.append(isFlipped()? buf.remaining(): buf.position());
+ builder.append("]");
+ return builder.toString();
+ }
+
+ }
// --------------------------------------------------------- Public Methods
+ @Override
+ public boolean supportsNonBlocking() {
+ return true;
+ }
/**
* Flush the response.
@@ -91,7 +168,7 @@ public class InternalNioOutputBuffer ext
super.flush();
// Flush the current buffer
- flushBuffer();
+ flushBuffer(isBlocking());
}
@@ -107,6 +184,9 @@ public class InternalNioOutputBuffer ext
socket.getBufHandler().getWriteBuffer().clear();
socket = null;
}
+ lastWrite.set(1);
+ setBlocking(true);
+ flipped = false;
}
@@ -118,7 +198,7 @@ public class InternalNioOutputBuffer ext
@Override
public void endRequest() throws IOException {
super.endRequest();
- flushBuffer();
+ flushBuffer(true);
}
// ------------------------------------------------ HTTP/1.1 Output Methods
@@ -146,8 +226,12 @@ public class InternalNioOutputBuffer ext
* @throws IOException
* TODO Fix non blocking write properly
*/
+ int total = 0;
private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
- if ( flip ) bytebuffer.flip();
+ if ( flip ) {
+ bytebuffer.flip();
+ flipped = true;
+ }
int written = 0;
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
@@ -168,7 +252,14 @@ public class InternalNioOutputBuffer ext
}finally {
if ( selector != null ) pool.put(selector);
}
- if ( block ) bytebuffer.clear(); //only clear
+ if ( block || bytebuffer.remaining()==0) {
+ //blocking writes must empty the buffer
+ //and if remaining==0 then we did empty it
+ bytebuffer.clear();
+ flipped = false;
+ }
+ total+= written;
+ //System.out.println("Successful write("+written+ " / "+total);
return written;
}
@@ -204,31 +295,48 @@ public class InternalNioOutputBuffer ext
}
+
private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
- while (length > 0) {
- int thisTime = length;
- if (socket.getBufHandler().getWriteBuffer().position() ==
- socket.getBufHandler().getWriteBuffer().capacity()
- || socket.getBufHandler().getWriteBuffer().remaining()==0) {
- flushBuffer();
- }
- if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) {
- thisTime = socket.getBufHandler().getWriteBuffer().remaining();
- }
- socket.getBufHandler().getWriteBuffer().put(buf, offset, thisTime);
+ //try to write to socket first
+ if (length==0) return;
+
+ boolean dataLeft = flushBuffer(isBlocking());
+
+ while (!dataLeft && length>0) {
+ int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
length = length - thisTime;
offset = offset + thisTime;
+ writeToSocket(socket.getBufHandler().getWriteBuffer(), isBlocking(), true);
+ dataLeft = flushBuffer(isBlocking());
}
+
NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
if ( ka!= null ) ka.access();//prevent timeouts for just doing client writes
+
+ if (!isBlocking() && length>0) {
+ //we must buffer as long as it fits
+ //ByteBufferHolder tail = bufferedWrite.
+ addToBuffers(buf, offset, length);
+ }
+ }
+
+ private void addToBuffers(byte[] buf, int offset, int length) {
+ ByteBufferHolder holder = bufferedWrite.peekLast();
+ if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
+ ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
+ holder = new ByteBufferHolder(buffer,false);
+ bufferedWrite.add(holder);
+ }
+ holder.getBuf().put(buf,offset,length);
}
/**
* Callback to write data from the buffer.
*/
- private void flushBuffer() throws IOException {
+ protected boolean flushBuffer(boolean block) throws IOException {
+ int result = 0;
//prevent timeout for async,
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
@@ -236,16 +344,61 @@ public class InternalNioOutputBuffer ext
attach.access();
}
+ boolean dataLeft = hasMoreDataToFlush();
+
//write to the socket, if there is anything to write
- if (socket.getBufHandler().getWriteBuffer().position() > 0) {
- socket.getBufHandler().getWriteBuffer().flip();
- writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false);
+ if ( dataLeft ) {
+ result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
}
+
+ dataLeft = hasMoreDataToFlush();
+
+ if (!dataLeft && bufferedWrite!=null) {
+ Iterator<ByteBufferHolder> bufIter = bufferedWrite.iterator();
+ while (!hasMoreDataToFlush() && bufIter.hasNext()) {
+ ByteBufferHolder buffer = bufIter.next();
+ buffer.flip();
+ while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
+ transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
+ if (buffer.getBuf().remaining() == 0) {
+ bufIter.remove();
}
+ result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
+ //here we must break if we didn't finish the write
+ }
+ }
+ }
+
+ dataLeft = hasMoreDataToFlush();
+
+ return dataLeft;
+ }
+
+ private boolean hasMoreDataToFlush() {
+ return (flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) ||
+ (!flipped && socket.getBufHandler().getWriteBuffer().position() > 0);
+ }
+
+ private int transfer(byte[] from, int offset, int length, ByteBuffer to) {
+ int max = Math.min(length, to.remaining());
+ ByteBuffer tmp = ByteBuffer.wrap(from, offset, max);
+ tmp.limit (tmp.position() + max);
+ to.put (tmp);
+ return max;
+ }
+
+ private int transfer(ByteBuffer from, ByteBuffer to) {
+ int max = Math.min(from.remaining(), to.remaining());
+ ByteBuffer tmp = from.duplicate ();
+ tmp.limit (tmp.position() + max);
+ to.put (tmp);
+ from.position(from.position() + max);
+ return max;
+ }
- // ----------------------------------- OutputStreamOutputBuffer Inner Class
+ // ----------------------------------- OutputStreamOutputBuffer Inner Class
/**
* This class is an output buffer which will write data to an output
@@ -275,4 +428,44 @@ public class InternalNioOutputBuffer ext
return byteCount;
}
}
+
+ //----------------------------------------non blocking writes -----------------
+ public void setBlocking(boolean blocking) {
+ this.blocking = blocking;
+ if (blocking)
+ bufferedWrite = null;
+ else
+ bufferedWrite = new LinkedBlockingDeque<ByteBufferHolder>();
+}
+
+ public void setBufferedWriteSize(int bufferedWriteSize) {
+ this.bufferedWriteSize = bufferedWriteSize;
+ }
+
+ public boolean isBlocking() {
+ return blocking;
+ }
+
+ private boolean hasBufferedData() {
+ boolean result = false;
+ if (bufferedWrite!=null) {
+ Iterator<ByteBufferHolder> iter = bufferedWrite.iterator();
+ while (!result && iter.hasNext()) {
+ result = iter.next().hasData();
+ }
+ }
+ return result;
+ }
+
+ public boolean hasDataToWrite() {
+ return hasMoreDataToFlush() || hasBufferedData();
+ }
+
+ public int getBufferedWriteSize() {
+ return bufferedWriteSize;
+ }
+
+ public boolean isWritable() {
+ return !hasDataToWrite();
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalOutputBuffer.java Fri Jul 6 06:53:52 2012
@@ -96,6 +96,11 @@ public class InternalOutputBuffer extend
// --------------------------------------------------------- Public Methods
@Override
+ public boolean supportsNonBlocking() {
+ return false;
+ }
+
+ @Override
public void init(SocketWrapper<Socket> socketWrapper,
AbstractEndpoint endpoint) throws IOException {
Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Fri Jul 6 06:53:52 2012
@@ -386,6 +386,8 @@ public class SpdyProcessor extends Abstr
((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
} else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+ } else if (actionCode == ActionCode.ASYNC_IS_ASYNC_OPERATION) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsyncOperation());
} else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
} else {
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Jul 6 06:53:52 2012
@@ -716,7 +716,16 @@ public class NioEndpoint extends Abstrac
return true;
}
- public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
+ public boolean dispatchForEvent(NioChannel socket, SocketStatus status, boolean dispatch) {
+ if (!dispatch) {
+ processSocket(socket,status,dispatch);
+ } else {
+ socket.getPoller().add(socket, OP_CALLBACK);
+ }
+ return true;
+ }
+
+ protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
if (attachment == null) {
@@ -900,7 +909,7 @@ public class NioEndpoint extends Abstrac
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
//handle callback flag
- if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
+ if ((interestOps & OP_CALLBACK) == OP_CALLBACK ) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
@@ -910,7 +919,8 @@ public class NioEndpoint extends Abstrac
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
- key.interestOps(ops);
+ if (att.getCometNotify()) key.interestOps(0);
+ else key.interestOps(ops);
} else {
cancel = true;
}
@@ -979,10 +989,6 @@ public class NioEndpoint extends Abstrac
public void cometInterest(NioChannel socket) {
KeyAttachment att = (KeyAttachment)socket.getAttachment(false);
add(socket,att.getCometOps());
- if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
- nextExpiration = 0; //force the check for faster callback
- selector.wakeup();
- }
}
/**
@@ -1001,6 +1007,9 @@ public class NioEndpoint extends Abstrac
PollerEvent r = eventCache.poll();
if ( r==null) r = new PollerEvent(socket,null,interestOps);
else r.reset(socket,null,interestOps);
+ if ( (interestOps&OP_CALLBACK) == OP_CALLBACK ) {
+ nextExpiration = 0; //force the check for faster callback
+ }
addEvent(r);
if (close) {
processSocket(socket, SocketStatus.STOP, false);
@@ -1256,7 +1265,7 @@ public class NioEndpoint extends Abstrac
boolean readAndWrite = sk.isReadable() && sk.isWritable();
reg(sk, attachment, 0);
- if (readAndWrite) {
+ if (attachment.isAsync() && readAndWrite) {
//remember the that we want to know about write too
attachment.interestOps(SelectionKey.OP_WRITE);
}
@@ -1406,7 +1415,7 @@ public class NioEndpoint extends Abstrac
// - the selector simply timed out (suggests there isn't much load)
// - the nextExpiration time has passed
// - the server socket is being closed
- if ((keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
+ if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
return;
}
//timeout
@@ -1421,10 +1430,11 @@ public class NioEndpoint extends Abstrac
cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
} else if ( ka.getError() ) {
cancelledKey(key, SocketStatus.ERROR);//TODO this is not yet being used
- } else if (ka.getComet() && ka.getCometNotify() ) {
+ } else if (ka.getCometNotify() ) {
ka.setCometNotify(false);
- reg(key,ka,0);//avoid multiple calls, this gets reregistered after invocation
- //if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
+ int ops = ka.interestOps() & ~OP_CALLBACK;;
+ reg(key,ka,0);//avoid multiple calls, this gets re-registered after invocation
+ ka.interestOps(ops);
if (!processSocket(ka.getChannel(), SocketStatus.OPEN_READ, true)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT, true);
} else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
(ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Fri Jul 6 06:53:52 2012
@@ -478,7 +478,7 @@ public class SecureNioChannel extends Ni
int written = sc.write(src);
return written;
} else {
- //make sure we can handle expand, and that we only use on buffer
+ //make sure we can handle expand, and that we only use one buffer
if ( (!this.isSendFile()) && (src != bufHandler.getWriteBuffer()) ) throw new IllegalArgumentException("You can only write using the application write buffer provided by the handler.");
//are we closing or closed?
if ( closing || closed) throw new IOException("Channel is in closing state.");
Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Fri Jul 6 06:53:52 2012
@@ -17,10 +17,10 @@
package org.apache.catalina.nonblocking;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
@@ -28,14 +28,12 @@ import javax.servlet.AsyncListener;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.junit.Assert;
-import org.junit.Test;
-
import org.apache.catalina.Wrapper;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.startup.BytesStreamer;
@@ -44,9 +42,14 @@ import org.apache.catalina.startup.Tomca
import org.apache.catalina.startup.TomcatBaseTest;
import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.ByteChunk.ByteOutputChannel;
+import org.junit.Assert;
+import org.junit.Test;
public class TestNonBlockingAPI extends TomcatBaseTest {
+ public static final long bytesToDownload = 1024 * 1024 * 5;
+
@Override
protected String getProtocol() {
return Http11NioProtocol.class.getName();
@@ -58,31 +61,89 @@ public class TestNonBlockingAPI extends
}
@Test
- public void testOne() throws Exception {
+ public void testNonBlockingRead() throws Exception {
// Configure a context with digest auth and a single protected resource
Tomcat tomcat = getTomcatInstance();
// Must have a real docBase - just use temp
- StandardContext ctx = (StandardContext)
- tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+ StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
- NBTesterServlet servlet = new NBTesterServlet();
- String servletName = NBTesterServlet.class.getName();
+ NBReadServlet servlet = new NBReadServlet();
+ String servletName = NBReadServlet.class.getName();
Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
ctx.addServletMapping("/", servletName);
tomcat.start();
- Map<String,List<String>> resHeaders= new HashMap<String, List<String>>();
- int rc = postUrl(true, new DataWriter(),"http://localhost:" + getPort() + "/", new ByteChunk(),resHeaders, null);
+ Map<String, List<String>> resHeaders = new HashMap<String, List<String>>();
+ int rc = postUrl(true, new DataWriter(500), "http://localhost:" + getPort() + "/", new ByteChunk(),
+ resHeaders, null);
Assert.assertEquals(HttpServletResponse.SC_OK, rc);
}
+ @Test
+ public void testNonBlockingWrite() throws Exception {
+ String bind = "localhost";
+ // Configure a context with digest auth and a single protected resource
+ Tomcat tomcat = getTomcatInstance();
+ // Must have a real docBase - just use temp
+ StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+
+ NBWriteServlet servlet = new NBWriteServlet();
+ String servletName = NBWriteServlet.class.getName();
+ Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
+ ctx.addServletMapping("/", servletName);
+ tomcat.getConnector().setProperty("socket.txBufSize", "1024");
+ tomcat.getConnector().setProperty("address", bind);
+ System.out.println(tomcat.getConnector().getProperty("address"));
+ tomcat.start();
+
+ Map<String, List<String>> resHeaders = new HashMap<String, List<String>>();
+ ByteChunk slowReader = new ByteChunk();
+ slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't work
+ slowReader.setByteOutputChannel(new ByteOutputChannel() {
+ long counter = 0;
+ long delta = 0;
+
+ @Override
+ public void realWriteBytes(byte[] cbuf, int off, int len) throws IOException {
+ try {
+ if (len == 0)
+ return;
+ counter += len;
+ delta += len;
+ if (counter > bytesToDownload) {
+ System.out.println("ERROR Downloaded more than expected ERROR");
+ } else if (counter == bytesToDownload) {
+ System.out.println("Download complete(" + bytesToDownload + " bytes)");
+ // } else if (counter > (1966086)) {
+ // System.out.println("Download almost complete, missing bytes ("+counter+")");
+ } else if (delta > (bytesToDownload / 16)) {
+ System.out.println("Read " + counter + " bytes.");
+ delta = 0;
+ Thread.currentThread().sleep(500);
+ }
+ } catch (Exception x) {
+ throw new IOException(x);
+ }
+ }
+ });
+ int rc = postUrl(true, new DataWriter(0), "http://" + bind + ":" + getPort() + "/", slowReader, resHeaders,
+ null);
+ slowReader.flushBuffer();
+ Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+ }
public static class DataWriter implements BytesStreamer {
final int max = 5;
int count = 0;
+ long delay = 0;
byte[] b = "WANTMORE".getBytes();
byte[] f = "FINISHED".getBytes();
+
+ public DataWriter(long delay) {
+ this.delay = delay;
+ }
+
@Override
public int getLength() {
return b.length * max;
@@ -90,7 +151,7 @@ public class TestNonBlockingAPI extends
@Override
public int available() {
- if (count<max) {
+ if (count < max) {
return b.length;
} else {
return 0;
@@ -100,9 +161,14 @@ public class TestNonBlockingAPI extends
@Override
public byte[] next() {
if (count < max) {
- if (count>0) try {Thread.sleep(6000);}catch(Exception x){}
+ if (count > 0)
+ try {
+ if (delay > 0)
+ Thread.sleep(delay);
+ } catch (Exception x) {
+ }
count++;
- if (count<max)
+ if (count < max)
return b;
else
return f;
@@ -113,12 +179,12 @@ public class TestNonBlockingAPI extends
}
- @WebServlet(asyncSupported=true)
- public static class NBTesterServlet extends TesterServlet {
+ @WebServlet(asyncSupported = true)
+ public static class NBReadServlet extends TesterServlet {
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- //step 1 - start async
+ // step 1 - start async
AsyncContext actx = req.startAsync();
actx.setTimeout(Long.MAX_VALUE);
actx.addListener(new AsyncListener() {
@@ -147,7 +213,7 @@ public class TestNonBlockingAPI extends
}
});
- //step 2 - notify on read
+ // step 2 - notify on read
ServletInputStream in = req.getInputStream();
ReadListener rlist = new TestReadListener(actx);
in.setReadListener(rlist);
@@ -155,17 +221,12 @@ public class TestNonBlockingAPI extends
while (in.isReady()) {
rlist.onDataAvailable();
}
- //step 3 - notify that we wish to read
- //ServletOutputStream out = resp.getOutputStream();
- //out.setWriteListener(new TestWriteListener(actx));
+ // step 3 - notify that we wish to read
+ // ServletOutputStream out = resp.getOutputStream();
+ // out.setWriteListener(new TestWriteListener(actx));
}
- @Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- doGet(req,resp);
- }
-
private class TestReadListener implements ReadListener {
AsyncContext ctx;
@@ -177,9 +238,9 @@ public class TestNonBlockingAPI extends
public void onDataAvailable() {
try {
ServletInputStream in = ctx.getRequest().getInputStream();
- int avail=0;
+ int avail = 0;
String s = "";
- while ((avail=in.dataAvailable()) > 0) {
+ while ((avail = in.dataAvailable()) > 0) {
byte[] b = new byte[avail];
in.read(b);
s += new String(b);
@@ -191,7 +252,7 @@ public class TestNonBlockingAPI extends
} else {
in.isReady();
}
- }catch (Exception x) {
+ } catch (Exception x) {
x.printStackTrace();
ctx.complete();
}
@@ -212,9 +273,62 @@ public class TestNonBlockingAPI extends
}
}
- private class TestWriteListener implements WriteListener {
+ }
+
+ @WebServlet(asyncSupported = true)
+ public static class NBWriteServlet extends TesterServlet {
+ @Override
+ protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ // step 1 - start async
+ AsyncContext actx = req.startAsync();
+ actx.setTimeout(Long.MAX_VALUE);
+ actx.addListener(new AsyncListener() {
+
+ @Override
+ public void onTimeout(AsyncEvent event) throws IOException {
+ System.out.println("onTimeout");
+
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent event) throws IOException {
+ System.out.println("onStartAsync");
+
+ }
+
+ @Override
+ public void onError(AsyncEvent event) throws IOException {
+ System.out.println("onError");
+
+ }
+
+ @Override
+ public void onComplete(AsyncEvent event) throws IOException {
+ System.out.println("onComplete");
+
+ }
+ });
+ // step 2 - notify on read
+ // ServletInputStream in = req.getInputStream();
+ // ReadListener rlist = new TestReadListener(actx);
+ // in.setReadListener(rlist);
+ //
+ // while (in.isReady()) {
+ // rlist.onDataAvailable();
+ // }
+ // step 3 - notify that we wish to read
+ ServletOutputStream out = resp.getOutputStream();
+ resp.setBufferSize(200 * 1024);
+ TestWriteListener listener = new TestWriteListener(actx);
+ out.setWriteListener(listener);
+ listener.onWritePossible();
+ }
+
+ private class TestWriteListener implements WriteListener {
+ long chunk = 1024 * 1024;
AsyncContext ctx;
+ long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
public TestWriteListener(AsyncContext ctx) {
this.ctx = ctx;
@@ -222,36 +336,40 @@ public class TestNonBlockingAPI extends
@Override
public void onWritePossible() {
- // TODO Auto-generated method stub
+ System.out.println("onWritePossible");
+ try {
+ long left = Math.max(bytesToDownload, 0);
+ long start = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
+ long before = left;
+ while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) {
+ byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)];
+ Arrays.fill(b, (byte) 'X');
+ ctx.getResponse().getOutputStream().write(b);
+ bytesToDownload -= b.length;
+ left = Math.max(bytesToDownload, 0);
+ }
+ System.out
+ .println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left);
+ // only call complete if we have emptied the buffer
+ if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) {
+ // it is illegal to call complete
+ // if there is a write in progress
+ ctx.complete();
+ }
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
}
@Override
public void onError(Throwable throwable) {
- // TODO Auto-generated method stub
-
+ System.out.println("onError");
+ throwable.printStackTrace();
}
}
-
-
- }
-
-
- private static class StockQuotes {
- public StockQuotes() {
- super();
- }
- Random r = new Random(System.currentTimeMillis());
-
- public String getNextQuote() {
- return String.format("VMW: $%10.0f", r.nextDouble());
- }
-
-
-
}
-
-
}
Modified: tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java?rev=1358055&r1=1358054&r2=1358055&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java (original)
+++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java Fri Jul 6 06:53:52 2012
@@ -49,6 +49,7 @@ import org.apache.catalina.core.AprLifec
import org.apache.catalina.core.StandardServer;
import org.apache.catalina.session.StandardManager;
import org.apache.catalina.valves.AccessLogValve;
+import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.tomcat.util.buf.ByteChunk;
/**
@@ -141,9 +142,9 @@ public abstract class TomcatBaseTest ext
// Has a protocol been specified
String protocol = System.getProperty("tomcat.test.protocol");
- // Use BIO by default
+ // Use NIO by default in Tomcat 8
if (protocol == null) {
- protocol = "org.apache.coyote.http11.Http11Protocol";
+ protocol = Http11NioProtocol.class.getName();
}
return protocol;
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org