You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2012/07/06 18:01:36 UTC
svn commit: r1358287 - in /tomcat/trunk:
java/org/apache/coyote/AsyncStateMachine.java
test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
test/org/apache/catalina/startup/TomcatBaseTest.java
Author: fhanik
Date: Fri Jul 6 16:01:36 2012
New Revision: 1358287
URL: http://svn.apache.org/viewvc?rev=1358287&view=rev
Log:
Add in test for write error
Modified:
tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1358287&r1=1358286&r2=1358287&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul 6 16:01:36 2012
@@ -296,7 +296,8 @@ public class AsyncStateMachine<S> {
public synchronized void asyncError() {
if (state == AsyncState.DISPATCHED ||
- state == AsyncState.TIMING_OUT) {
+ state == AsyncState.TIMING_OUT ||
+ state == AsyncState.READ_WRITE_OP) {
state = AsyncState.ERROR;
} else {
throw new IllegalStateException(
Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1358287&r1=1358286&r2=1358287&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Fri Jul 6 16:01:36 2012
@@ -16,7 +16,12 @@
*/
package org.apache.catalina.nonblocking;
+import java.io.BufferedInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -133,6 +138,69 @@ public class TestNonBlockingAPI extends
Assert.assertEquals(HttpServletResponse.SC_OK, rc);
}
+
+ @Test
+ public void testNonBlockingWriteError() throws Exception {
+ String bind = "localhost";
+ // Configure a context with digest auth and a single protected resource
+ Tomcat tomcat = getTomcatInstance();
+ // Must have a real docBase - just use temp
+ StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+
+ NBWriteServlet servlet = new NBWriteServlet();
+ String servletName = NBWriteServlet.class.getName();
+ Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
+ ctx.addServletMapping("/", servletName);
+ tomcat.getConnector().setProperty("socket.txBufSize", "1024");
+ tomcat.getConnector().setProperty("address", bind);
+ System.out.println(tomcat.getConnector().getProperty("address"));
+ tomcat.start();
+
+ Map<String, List<String>> resHeaders = new HashMap<String, List<String>>();
+ ByteChunk slowReader = new ByteChunk();
+ slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't work
+ slowReader.setByteOutputChannel(new ByteOutputChannel() {
+ long counter = 0;
+ long delta = 0;
+
+ @Override
+ public void realWriteBytes(byte[] cbuf, int off, int len) throws IOException {
+ try {
+ if (len == 0)
+ return;
+ counter += len;
+ delta += len;
+ if (counter > bytesToDownload) {
+ System.out.println("ERROR Downloaded more than expected ERROR");
+ } else if (counter == bytesToDownload) {
+ System.out.println("Download complete(" + bytesToDownload + " bytes)");
+ // } else if (counter > (1966086)) {
+ // System.out.println("Download almost complete, missing bytes ("+counter+")");
+ } else if (delta > (bytesToDownload / 16)) {
+ System.out.println("Read " + counter + " bytes.");
+ delta = 0;
+ Thread.currentThread().sleep(500);
+ }
+ } catch (Exception x) {
+ throw new IOException(x);
+ }
+ }
+ });
+ int rc = postUrlWithDisconnect(true, new DataWriter(0), "http://" + bind + ":" + getPort() + "/", slowReader, resHeaders,
+ null);
+ slowReader.flushBuffer();
+ Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+ try {
+ //allow the listeners to finish up
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
+ Assert.assertTrue("Error listener should have been invoked.", servlet.wlistener.onErrorInvoked);
+
+ }
+
+
+
public static class DataWriter implements BytesStreamer {
final int max = 5;
int count = 0;
@@ -180,8 +248,8 @@ public class TestNonBlockingAPI extends
}
@WebServlet(asyncSupported = true)
- public static class NBReadServlet extends TesterServlet {
-
+ public class NBReadServlet extends TesterServlet {
+ public volatile TestReadListener listener;
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// step 1 - start async
@@ -203,7 +271,7 @@ public class TestNonBlockingAPI extends
@Override
public void onError(AsyncEvent event) throws IOException {
- System.out.println("onError");
+ System.out.println("AsyncListener.onError");
}
@@ -215,11 +283,11 @@ public class TestNonBlockingAPI extends
});
// step 2 - notify on read
ServletInputStream in = req.getInputStream();
- ReadListener rlist = new TestReadListener(actx);
- in.setReadListener(rlist);
+ listener = new TestReadListener(actx);
+ in.setReadListener(listener);
while (in.isReady()) {
- rlist.onDataAvailable();
+ listener.onDataAvailable();
}
// step 3 - notify that we wish to read
// ServletOutputStream out = resp.getOutputStream();
@@ -227,56 +295,13 @@ public class TestNonBlockingAPI extends
}
- private class TestReadListener implements ReadListener {
- AsyncContext ctx;
-
- public TestReadListener(AsyncContext ctx) {
- this.ctx = ctx;
- }
-
- @Override
- public void onDataAvailable() {
- try {
- ServletInputStream in = ctx.getRequest().getInputStream();
- int avail = 0;
- String s = "";
- while ((avail = in.dataAvailable()) > 0) {
- byte[] b = new byte[avail];
- in.read(b);
- s += new String(b);
- }
- System.out.println(s);
- if ("FINISHED".equals(s)) {
- ctx.complete();
- ctx.getResponse().getWriter().print("OK");
- } else {
- in.isReady();
- }
- } catch (Exception x) {
- x.printStackTrace();
- ctx.complete();
- }
-
- }
-
- @Override
- public void onAllDataRead() {
- System.out.println("onAllDataRead");
-
- }
-
- @Override
- public void onError(Throwable throwable) {
- System.out.println("onError");
- throwable.printStackTrace();
-
- }
- }
}
@WebServlet(asyncSupported = true)
- public static class NBWriteServlet extends TesterServlet {
+ public class NBWriteServlet extends TesterServlet {
+ public volatile TestWriteListener wlistener;
+ public volatile TestReadListener rlistener;
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@@ -299,7 +324,7 @@ public class TestNonBlockingAPI extends
@Override
public void onError(AsyncEvent event) throws IOException {
- System.out.println("onError");
+ System.out.println("AsyncListener.onError");
}
@@ -310,66 +335,179 @@ public class TestNonBlockingAPI extends
}
});
// step 2 - notify on read
- // ServletInputStream in = req.getInputStream();
- // ReadListener rlist = new TestReadListener(actx);
- // in.setReadListener(rlist);
- //
- // while (in.isReady()) {
- // rlist.onDataAvailable();
- // }
- // step 3 - notify that we wish to read
+ ServletInputStream in = req.getInputStream();
+ rlistener = new TestReadListener(actx);
+ in.setReadListener(rlistener);
ServletOutputStream out = resp.getOutputStream();
resp.setBufferSize(200 * 1024);
- TestWriteListener listener = new TestWriteListener(actx);
- out.setWriteListener(listener);
- listener.onWritePossible();
+ wlistener = new TestWriteListener(actx);
+ out.setWriteListener(wlistener);
+ wlistener.onWritePossible();
}
- private class TestWriteListener implements WriteListener {
- long chunk = 1024 * 1024;
- AsyncContext ctx;
- long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
- public TestWriteListener(AsyncContext ctx) {
- this.ctx = ctx;
+ }
+ private class TestReadListener implements ReadListener {
+ AsyncContext ctx;
+ public volatile boolean onErrorInvoked = false;
+
+ public TestReadListener(AsyncContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void onDataAvailable() {
+ try {
+ ServletInputStream in = ctx.getRequest().getInputStream();
+ int avail = 0;
+ String s = "";
+ while ((avail = in.dataAvailable()) > 0) {
+ byte[] b = new byte[avail];
+ in.read(b);
+ s += new String(b);
+ }
+ System.out.println(s);
+ if ("FINISHED".equals(s)) {
+ ctx.complete();
+ ctx.getResponse().getWriter().print("OK");
+ } else {
+ in.isReady();
+ }
+ } catch (Exception x) {
+ x.printStackTrace();
+ ctx.complete();
}
- @Override
- public void onWritePossible() {
- System.out.println("onWritePossible");
- try {
- long left = Math.max(bytesToDownload, 0);
- long start = System.currentTimeMillis();
- long end = System.currentTimeMillis();
- long before = left;
- while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) {
- byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)];
- Arrays.fill(b, (byte) 'X');
- ctx.getResponse().getOutputStream().write(b);
- bytesToDownload -= b.length;
- left = Math.max(bytesToDownload, 0);
- }
- System.out
- .println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left);
- // only call complete if we have emptied the buffer
- if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) {
- // it is illegal to call complete
- // if there is a write in progress
- ctx.complete();
+ }
+
+ @Override
+ public void onAllDataRead() {
+ System.out.println("onAllDataRead");
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.out.println("ReadListener.onError");
+ throwable.printStackTrace();
+ onErrorInvoked = true;
+
+ }
+ }
+
+ private class TestWriteListener implements WriteListener {
+ long chunk = 1024 * 1024;
+ AsyncContext ctx;
+ long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
+ public volatile boolean onErrorInvoked = false;
+
+ public TestWriteListener(AsyncContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void onWritePossible() {
+ System.out.println("onWritePossible");
+ try {
+ long left = Math.max(bytesToDownload, 0);
+ long start = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
+ long before = left;
+ while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) {
+ byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)];
+ Arrays.fill(b, (byte) 'X');
+ ctx.getResponse().getOutputStream().write(b);
+ bytesToDownload -= b.length;
+ left = Math.max(bytesToDownload, 0);
+ }
+ System.out.println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left);
+ // only call complete if we have emptied the buffer
+ if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) {
+ // it is illegal to call complete
+ // if there is a write in progress
+ ctx.complete();
+ }
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.out.println("WriteListener.onError");
+ throwable.printStackTrace();
+ onErrorInvoked = true;
+ }
+
+ }
+
+ public static int postUrlWithDisconnect(boolean stream, BytesStreamer streamer, String path, ByteChunk out,
+ Map<String, List<String>> reqHead, Map<String, List<String>> resHead) throws IOException {
+
+ URL url = new URL(path);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(true);
+ connection.setReadTimeout(1000000);
+ if (reqHead != null) {
+ for (Map.Entry<String, List<String>> entry : reqHead.entrySet()) {
+ StringBuilder valueList = new StringBuilder();
+ for (String value : entry.getValue()) {
+ if (valueList.length() > 0) {
+ valueList.append(',');
}
- } catch (Exception x) {
- x.printStackTrace();
+ valueList.append(value);
}
+ connection.setRequestProperty(entry.getKey(), valueList.toString());
+ }
+ }
+ if (streamer != null && stream) {
+ if (streamer.getLength() > 0) {
+ connection.setFixedLengthStreamingMode(streamer.getLength());
+ } else {
+ connection.setChunkedStreamingMode(1024);
+ }
+ }
+
+ connection.connect();
+ // Write the request body
+ OutputStream os = null;
+ try {
+ os = connection.getOutputStream();
+ while (streamer != null && streamer.available() > 0) {
+ byte[] next = streamer.next();
+ os.write(next);
+ os.flush();
}
- @Override
- public void onError(Throwable throwable) {
- System.out.println("onError");
- throwable.printStackTrace();
+ } finally {
+ if (os != null) {
+ try {
+ os.close();
+ } catch (IOException ioe) {
+ // Ignore
+ }
}
+ }
+ int rc = connection.getResponseCode();
+ if (resHead != null) {
+ Map<String, List<String>> head = connection.getHeaderFields();
+ resHead.putAll(head);
}
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ if (rc == HttpServletResponse.SC_OK) {
+ connection.getInputStream().close();
+ os.close();
+ connection.disconnect();
+ }
+ return rc;
}
+
+
}
Modified: tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java?rev=1358287&r1=1358286&r2=1358287&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java (original)
+++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java Fri Jul 6 16:01:36 2012
@@ -290,7 +290,7 @@ public abstract class TomcatBaseTest ext
public static int postUrl(final byte[] body, String path, ByteChunk out,
Map<String, List<String>> reqHead,
Map<String, List<String>> resHead) throws IOException {
- BytesStreamer s = new BytesStreamer() {
+ BytesStreamer s = new BytesStreamer() {
boolean done = false;
@Override
public byte[] next() {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org