You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/05/10 11:35:53 UTC

svn commit: r1480963 - in /tomcat/trunk: java/org/apache/coyote/http11/InternalAprInputBuffer.java java/org/apache/coyote/http11/InternalAprOutputBuffer.java test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java

Author: markt
Date: Fri May 10 09:35:53 2013
New Revision: 1480963

URL: http://svn.apache.org/r1480963
Log:
Non-blocking Servlet 3.1 reads for APR

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=1480963&r1=1480962&r2=1480963&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java Fri May 10 09:35:53 2013
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.coyote.InputBuffer;
 import org.apache.coyote.Request;
@@ -89,9 +91,10 @@ public class InternalAprInputBuffer exte
     private long socket;
 
 
-    // --------------------------------------------------------- Public Methods
+    private SocketWrapper<Long> wrapper;
 
 
+    // --------------------------------------------------------- Public Methods
 
     /**
      * Recycle the input buffer. This should be called when closing the
@@ -100,6 +103,7 @@ public class InternalAprInputBuffer exte
     @Override
     public void recycle() {
         socket = 0;
+        wrapper = null;
         super.recycle();
     }
 
@@ -133,7 +137,7 @@ public class InternalAprInputBuffer exte
                 if (useAvailableData) {
                     return false;
                 }
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -150,7 +154,7 @@ public class InternalAprInputBuffer exte
             if (useAvailableData) {
                 return false;
             }
-            if (!fill())
+            if (!fill(true))
                 throw new EOFException(sm.getString("iib.eof.error"));
         }
 
@@ -165,7 +169,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -188,7 +192,7 @@ public class InternalAprInputBuffer exte
         while (space) {
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
             if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
@@ -213,7 +217,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -249,7 +253,7 @@ public class InternalAprInputBuffer exte
         while (space) {
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
             if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
@@ -273,7 +277,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -340,7 +344,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -374,7 +378,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -416,7 +420,7 @@ public class InternalAprInputBuffer exte
 
                 // Read new bytes if needed
                 if (pos >= lastValid) {
-                    if (!fill())
+                    if (!fill(true))
                         throw new EOFException(sm.getString("iib.eof.error"));
                 }
 
@@ -435,7 +439,7 @@ public class InternalAprInputBuffer exte
 
                 // Read new bytes if needed
                 if (pos >= lastValid) {
-                    if (!fill())
+                    if (!fill(true))
                         throw new EOFException(sm.getString("iib.eof.error"));
                 }
 
@@ -463,7 +467,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -499,7 +503,7 @@ public class InternalAprInputBuffer exte
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     throw new EOFException(sm.getString("iib.eof.error"));
             }
 
@@ -545,24 +549,20 @@ public class InternalAprInputBuffer exte
             AbstractEndpoint endpoint) throws IOException {
 
         socket = socketWrapper.getSocket().longValue();
+        wrapper = socketWrapper;
         Socket.setrbb(this.socket, bbuf);
     }
 
 
-    @Override
-    protected boolean fill(boolean block) throws IOException {
-        // Ignore the block parameter and just call fill
-        return fill();
-    }
-
-
     /**
-     * Fill the internal buffer using data from the underlying input stream.
+     * Attempts to read some data into the input buffer.
      *
-     * @return false if at end of stream
+     * @return <code>true</code> if more data was added to the input buffer
+     *         otherwise <code>false</code>
      */
-    protected boolean fill()
-        throws IOException {
+    @Override
+    protected boolean fill(boolean block) throws IOException {
+        // Ignore the block parameter
 
         int nRead = 0;
 
@@ -574,7 +574,7 @@ public class InternalAprInputBuffer exte
             }
 
             bbuf.clear();
-            nRead = Socket.recvbb(socket, 0, buf.length - lastValid);
+            nRead = doReadSocket(true);
             if (nRead > 0) {
                 bbuf.limit(nRead);
                 bbuf.get(buf, pos, nRead);
@@ -599,7 +599,7 @@ public class InternalAprInputBuffer exte
             pos = end;
             lastValid = pos;
             bbuf.clear();
-            nRead = Socket.recvbb(socket, 0, buf.length - lastValid);
+            nRead = doReadSocket(true);
             if (nRead > 0) {
                 bbuf.limit(nRead);
                 bbuf.get(buf, pos, nRead);
@@ -618,15 +618,69 @@ public class InternalAprInputBuffer exte
         }
 
         return (nRead > 0);
-
     }
 
 
     @Override
     protected int nbRead() throws IOException {
-        return 0;
-        // TODO
-        // throw new UnsupportedOperationException("APR non-blocking read");
+        bbuf.clear();
+        int nRead = doReadSocket(false);
+
+        if (nRead > 0) {
+            bbuf.limit(nRead);
+            bbuf.get(buf, pos, nRead);
+            lastValid = pos + nRead;
+            return nRead;
+        } else if (-nRead == Status.EAGAIN) {
+            return 0;
+        } else {
+            throw new IOException(sm.getString("iib.failedread",
+                    Integer.valueOf(-nRead)));
+        }
+    }
+
+
+    private int doReadSocket(boolean block) {
+
+        Lock readLock = wrapper.getBlockingStatusReadLock();
+        WriteLock writeLock = wrapper.getBlockingStatusWriteLock();
+
+        boolean readDone = false;
+        int result = 0;
+        try {
+            readLock.lock();
+            if (wrapper.getBlockingStatus() == block) {
+                result = Socket.recvbb(socket, 0, buf.length - lastValid);
+                readDone = true;
+            }
+        } finally {
+            readLock.unlock();
+        }
+
+        if (!readDone) {
+            try {
+                writeLock.lock();
+                wrapper.setBlockingStatus(block);
+                // Set the current settings for this socket
+                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, (block ? 0 : 1));
+                // Downgrade the lock
+                try {
+                    readLock.lock();
+                    writeLock.unlock();
+                    result = Socket.recvbb(socket, 0, buf.length - lastValid);
+                } finally {
+                    readLock.unlock();
+                }
+            } finally {
+                // Should have been released above but may not have been on some
+                // exception paths
+                if (writeLock.isHeldByCurrentThread()) {
+                    writeLock.unlock();
+                }
+            }
+        }
+
+        return result;
     }
 
 
@@ -648,7 +702,7 @@ public class InternalAprInputBuffer exte
             throws IOException {
 
             if (pos >= lastValid) {
-                if (!fill())
+                if (!fill(true))
                     return -1;
             }
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1480963&r1=1480962&r2=1480963&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Fri May 10 09:35:53 2013
@@ -125,6 +125,7 @@ public class InternalAprOutputBuffer ext
         bbuf.clear();
         flipped = false;
 
+        socket = 0;
         wrapper = null;
     }
 

Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1480963&r1=1480962&r2=1480963&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Fri May 10 09:35:53 2013
@@ -57,14 +57,9 @@ public class TestNonBlockingAPI extends 
     public void testNonBlockingRead() throws Exception {
         Tomcat tomcat = getTomcatInstance();
 
-        // TODO Non-blocking reads are not yet implemented for APR.
-        if (tomcat.getConnector().getProtocolHandlerClassName().equals(
-                "org.apache.coyote.http11.Http11AprProtocol")) {
-            return;
-        }
-
         // Must have a real docBase - just use temp
-        StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        StandardContext ctx = (StandardContext) tomcat.addContext("",
+                System.getProperty("java.io.tmpdir"));
 
         NBReadServlet servlet = new NBReadServlet();
         String servletName = NBReadServlet.class.getName();
@@ -74,8 +69,8 @@ public class TestNonBlockingAPI extends 
         tomcat.start();
 
         Map<String, List<String>> resHeaders = new HashMap<>();
-        int rc = postUrl(true, new DataWriter(500), "http://localhost:" + getPort() + "/", new ByteChunk(),
-                resHeaders, null);
+        int rc = postUrl(true, new DataWriter(500), "http://localhost:" +
+                getPort() + "/", new ByteChunk(), resHeaders, null);
         Assert.assertEquals(HttpServletResponse.SC_OK, rc);
     }
 
@@ -84,7 +79,8 @@ public class TestNonBlockingAPI extends 
     public void testNonBlockingWrite() throws Exception {
         Tomcat tomcat = getTomcatInstance();
         // Must have a real docBase - just use temp
-        StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        StandardContext ctx = (StandardContext) tomcat.addContext("",
+                System.getProperty("java.io.tmpdir"));
 
         NBWriteServlet servlet = new NBWriteServlet();
         String servletName = NBWriteServlet.class.getName();



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org