You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2012/07/06 18:01:36 UTC

svn commit: r1358287 - in /tomcat/trunk: java/org/apache/coyote/AsyncStateMachine.java test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java test/org/apache/catalina/startup/TomcatBaseTest.java

Author: fhanik
Date: Fri Jul  6 16:01:36 2012
New Revision: 1358287

URL: http://svn.apache.org/viewvc?rev=1358287&view=rev
Log:
Add in test for write error 

Modified:
    tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
    tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java

Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1358287&r1=1358286&r2=1358287&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul  6 16:01:36 2012
@@ -296,7 +296,8 @@ public class AsyncStateMachine<S> {
 
     public synchronized void asyncError() {
         if (state == AsyncState.DISPATCHED ||
-                state == AsyncState.TIMING_OUT) {
+                state == AsyncState.TIMING_OUT ||
+                state == AsyncState.READ_WRITE_OP) {
             state = AsyncState.ERROR;
         } else {
             throw new IllegalStateException(

Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1358287&r1=1358286&r2=1358287&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Fri Jul  6 16:01:36 2012
@@ -16,7 +16,12 @@
  */
 package org.apache.catalina.nonblocking;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -133,6 +138,69 @@ public class TestNonBlockingAPI extends 
         Assert.assertEquals(HttpServletResponse.SC_OK, rc);
     }
 
+
+    @Test
+    public void testNonBlockingWriteError() throws Exception {
+        String bind = "localhost";
+        // Configure a context with digest auth and a single protected resource
+        Tomcat tomcat = getTomcatInstance();
+        // Must have a real docBase - just use temp
+        StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+
+        NBWriteServlet servlet = new NBWriteServlet();
+        String servletName = NBWriteServlet.class.getName();
+        Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet);
+        ctx.addServletMapping("/", servletName);
+        tomcat.getConnector().setProperty("socket.txBufSize", "1024");
+        tomcat.getConnector().setProperty("address", bind);
+        System.out.println(tomcat.getConnector().getProperty("address"));
+        tomcat.start();
+
+        Map<String, List<String>> resHeaders = new HashMap<String, List<String>>();
+        ByteChunk slowReader = new ByteChunk();
+        slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't work
+        slowReader.setByteOutputChannel(new ByteOutputChannel() {
+            long counter = 0;
+            long delta = 0;
+
+            @Override
+            public void realWriteBytes(byte[] cbuf, int off, int len) throws IOException {
+                try {
+                    if (len == 0)
+                        return;
+                    counter += len;
+                    delta += len;
+                    if (counter > bytesToDownload) {
+                        System.out.println("ERROR Downloaded more than expected ERROR");
+                    } else if (counter == bytesToDownload) {
+                        System.out.println("Download complete(" + bytesToDownload + " bytes)");
+                        // } else if (counter > (1966086)) {
+                        // System.out.println("Download almost complete, missing bytes ("+counter+")");
+                    } else if (delta > (bytesToDownload / 16)) {
+                        System.out.println("Read " + counter + " bytes.");
+                        delta = 0;
+                        Thread.currentThread().sleep(500);
+                    }
+                } catch (Exception x) {
+                    throw new IOException(x);
+                }
+            }
+        });
+        int rc = postUrlWithDisconnect(true, new DataWriter(0), "http://" + bind + ":" + getPort() + "/", slowReader, resHeaders,
+                null);
+        slowReader.flushBuffer();
+        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+        try {
+            //allow the listeners to finish up
+            Thread.sleep(1000);
+        } catch (Exception e) {
+        }
+        Assert.assertTrue("Error listener should have been invoked.", servlet.wlistener.onErrorInvoked);
+
+    }
+
+
+
     public static class DataWriter implements BytesStreamer {
         final int max = 5;
         int count = 0;
@@ -180,8 +248,8 @@ public class TestNonBlockingAPI extends 
     }
 
     @WebServlet(asyncSupported = true)
-    public static class NBReadServlet extends TesterServlet {
-
+    public class NBReadServlet extends TesterServlet {
+        public volatile TestReadListener listener;
         @Override
         protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
             // step 1 - start async
@@ -203,7 +271,7 @@ public class TestNonBlockingAPI extends 
 
                 @Override
                 public void onError(AsyncEvent event) throws IOException {
-                    System.out.println("onError");
+                    System.out.println("AsyncListener.onError");
 
                 }
 
@@ -215,11 +283,11 @@ public class TestNonBlockingAPI extends 
             });
             // step 2 - notify on read
             ServletInputStream in = req.getInputStream();
-            ReadListener rlist = new TestReadListener(actx);
-            in.setReadListener(rlist);
+            listener = new TestReadListener(actx);
+            in.setReadListener(listener);
 
             while (in.isReady()) {
-                rlist.onDataAvailable();
+                listener.onDataAvailable();
             }
             // step 3 - notify that we wish to read
             // ServletOutputStream out = resp.getOutputStream();
@@ -227,56 +295,13 @@ public class TestNonBlockingAPI extends 
 
         }
 
-        private class TestReadListener implements ReadListener {
-            AsyncContext ctx;
-
-            public TestReadListener(AsyncContext ctx) {
-                this.ctx = ctx;
-            }
-
-            @Override
-            public void onDataAvailable() {
-                try {
-                    ServletInputStream in = ctx.getRequest().getInputStream();
-                    int avail = 0;
-                    String s = "";
-                    while ((avail = in.dataAvailable()) > 0) {
-                        byte[] b = new byte[avail];
-                        in.read(b);
-                        s += new String(b);
-                    }
-                    System.out.println(s);
-                    if ("FINISHED".equals(s)) {
-                        ctx.complete();
-                        ctx.getResponse().getWriter().print("OK");
-                    } else {
-                        in.isReady();
-                    }
-                } catch (Exception x) {
-                    x.printStackTrace();
-                    ctx.complete();
-                }
-
-            }
-
-            @Override
-            public void onAllDataRead() {
-                System.out.println("onAllDataRead");
-
-            }
-
-            @Override
-            public void onError(Throwable throwable) {
-                System.out.println("onError");
-                throwable.printStackTrace();
-
-            }
-        }
 
     }
 
     @WebServlet(asyncSupported = true)
-    public static class NBWriteServlet extends TesterServlet {
+    public class NBWriteServlet extends TesterServlet {
+        public volatile TestWriteListener wlistener;
+        public volatile TestReadListener rlistener;
 
         @Override
         protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@@ -299,7 +324,7 @@ public class TestNonBlockingAPI extends 
 
                 @Override
                 public void onError(AsyncEvent event) throws IOException {
-                    System.out.println("onError");
+                    System.out.println("AsyncListener.onError");
 
                 }
 
@@ -310,66 +335,179 @@ public class TestNonBlockingAPI extends 
                 }
             });
             // step 2 - notify on read
-            // ServletInputStream in = req.getInputStream();
-            // ReadListener rlist = new TestReadListener(actx);
-            // in.setReadListener(rlist);
-            //
-            // while (in.isReady()) {
-            // rlist.onDataAvailable();
-            // }
-            // step 3 - notify that we wish to read
+            ServletInputStream in = req.getInputStream();
+            rlistener = new TestReadListener(actx);
+            in.setReadListener(rlistener);
             ServletOutputStream out = resp.getOutputStream();
             resp.setBufferSize(200 * 1024);
-            TestWriteListener listener = new TestWriteListener(actx);
-            out.setWriteListener(listener);
-            listener.onWritePossible();
+            wlistener = new TestWriteListener(actx);
+            out.setWriteListener(wlistener);
+            wlistener.onWritePossible();
         }
 
-        private class TestWriteListener implements WriteListener {
-            long chunk = 1024 * 1024;
-            AsyncContext ctx;
-            long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
 
-            public TestWriteListener(AsyncContext ctx) {
-                this.ctx = ctx;
+    }
+    private class TestReadListener implements ReadListener {
+        AsyncContext ctx;
+        public volatile boolean onErrorInvoked = false;
+
+        public TestReadListener(AsyncContext ctx) {
+            this.ctx = ctx;
+        }
+
+        @Override
+        public void onDataAvailable() {
+            try {
+                ServletInputStream in = ctx.getRequest().getInputStream();
+                int avail = 0;
+                String s = "";
+                while ((avail = in.dataAvailable()) > 0) {
+                    byte[] b = new byte[avail];
+                    in.read(b);
+                    s += new String(b);
+                }
+                System.out.println(s);
+                if ("FINISHED".equals(s)) {
+                    ctx.complete();
+                    ctx.getResponse().getWriter().print("OK");
+                } else {
+                    in.isReady();
+                }
+            } catch (Exception x) {
+                x.printStackTrace();
+                ctx.complete();
             }
 
-            @Override
-            public void onWritePossible() {
-                System.out.println("onWritePossible");
-                try {
-                    long left = Math.max(bytesToDownload, 0);
-                    long start = System.currentTimeMillis();
-                    long end = System.currentTimeMillis();
-                    long before = left;
-                    while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) {
-                        byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)];
-                        Arrays.fill(b, (byte) 'X');
-                        ctx.getResponse().getOutputStream().write(b);
-                        bytesToDownload -= b.length;
-                        left = Math.max(bytesToDownload, 0);
-                    }
-                    System.out
-                            .println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left);
-                    // only call complete if we have emptied the buffer
-                    if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) {
-                        // it is illegal to call complete
-                        // if there is a write in progress
-                        ctx.complete();
+        }
+
+        @Override
+        public void onAllDataRead() {
+            System.out.println("onAllDataRead");
+
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            System.out.println("ReadListener.onError");
+            throwable.printStackTrace();
+            onErrorInvoked = true;
+
+        }
+    }
+
+    private class TestWriteListener implements WriteListener {
+        long chunk = 1024 * 1024;
+        AsyncContext ctx;
+        long bytesToDownload = TestNonBlockingAPI.bytesToDownload;
+        public volatile boolean onErrorInvoked = false;
+
+        public TestWriteListener(AsyncContext ctx) {
+            this.ctx = ctx;
+        }
+
+        @Override
+        public void onWritePossible() {
+            System.out.println("onWritePossible");
+            try {
+                long left = Math.max(bytesToDownload, 0);
+                long start = System.currentTimeMillis();
+                long end = System.currentTimeMillis();
+                long before = left;
+                while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) {
+                    byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)];
+                    Arrays.fill(b, (byte) 'X');
+                    ctx.getResponse().getOutputStream().write(b);
+                    bytesToDownload -= b.length;
+                    left = Math.max(bytesToDownload, 0);
+                }
+                System.out.println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left);
+                // only call complete if we have emptied the buffer
+                if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) {
+                    // it is illegal to call complete
+                    // if there is a write in progress
+                    ctx.complete();
+                }
+            } catch (Exception x) {
+                x.printStackTrace();
+            }
+
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            System.out.println("WriteListener.onError");
+            throwable.printStackTrace();
+            onErrorInvoked = true;
+        }
+
+    }
+
+    public static int postUrlWithDisconnect(boolean stream, BytesStreamer streamer, String path, ByteChunk out,
+            Map<String, List<String>> reqHead, Map<String, List<String>> resHead) throws IOException {
+
+        URL url = new URL(path);
+        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+        connection.setDoOutput(true);
+        connection.setReadTimeout(1000000);
+        if (reqHead != null) {
+            for (Map.Entry<String, List<String>> entry : reqHead.entrySet()) {
+                StringBuilder valueList = new StringBuilder();
+                for (String value : entry.getValue()) {
+                    if (valueList.length() > 0) {
+                        valueList.append(',');
                     }
-                } catch (Exception x) {
-                    x.printStackTrace();
+                    valueList.append(value);
                 }
+                connection.setRequestProperty(entry.getKey(), valueList.toString());
+            }
+        }
+        if (streamer != null && stream) {
+            if (streamer.getLength() > 0) {
+                connection.setFixedLengthStreamingMode(streamer.getLength());
+            } else {
+                connection.setChunkedStreamingMode(1024);
+            }
+        }
+
+        connection.connect();
 
+        // Write the request body
+        OutputStream os = null;
+        try {
+            os = connection.getOutputStream();
+            while (streamer != null && streamer.available() > 0) {
+                byte[] next = streamer.next();
+                os.write(next);
+                os.flush();
             }
 
-            @Override
-            public void onError(Throwable throwable) {
-                System.out.println("onError");
-                throwable.printStackTrace();
+        } finally {
+            if (os != null) {
+                try {
+                    os.close();
+                } catch (IOException ioe) {
+                    // Ignore
+                }
             }
+        }
 
+        int rc = connection.getResponseCode();
+        if (resHead != null) {
+            Map<String, List<String>> head = connection.getHeaderFields();
+            resHead.putAll(head);
         }
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
 
+        }
+        if (rc == HttpServletResponse.SC_OK) {
+            connection.getInputStream().close();
+            os.close();
+            connection.disconnect();
+        }
+        return rc;
     }
+
+
 }

Modified: tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java?rev=1358287&r1=1358286&r2=1358287&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java (original)
+++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java Fri Jul  6 16:01:36 2012
@@ -290,7 +290,7 @@ public abstract class TomcatBaseTest ext
     public static int postUrl(final byte[] body, String path, ByteChunk out,
             Map<String, List<String>> reqHead,
             Map<String, List<String>> resHead) throws IOException {
-        BytesStreamer s = new BytesStreamer() {
+            BytesStreamer s = new BytesStreamer() {
             boolean done = false;
             @Override
             public byte[] next() {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org