You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2012/11/24 18:35:47 UTC

svn commit: r1413212 - in /tomcat/trunk/java/org/apache/coyote/http11/upgrade: UpgradeAprProcessor.java UpgradeNioProcessor.java

Author: markt
Date: Sat Nov 24 17:35:46 2012
New Revision: 1413212

URL: http://svn.apache.org/viewvc?rev=1413212&view=rev
Log:
First cut HTTP upgrade for NIO/APR
- Non-blocking not supported
- WebSocket broken

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java?rev=1413212&r1=1413211&r2=1413212&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java Sat Nov 24 17:35:46 2012
@@ -26,76 +26,86 @@ import org.apache.tomcat.util.net.Socket
 
 public class UpgradeAprProcessor extends UpgradeProcessor<Long> {
 
-    private final long socket;
-
+    private static final int INFINITE_TIMEOUT = -1;
 
     public UpgradeAprProcessor(SocketWrapper<Long> wrapper,
             ProtocolHandler httpUpgradeProcessor) {
-        super(upgradeInbound);
-
-        Socket.timeoutSet(wrapper.getSocket().longValue(),
-                upgradeInbound.getReadTimeout());
+        super(httpUpgradeProcessor,
+                new AprUpgradeServletInputStream(wrapper.getSocket().longValue()),
+                new AprUpgradeServletOutputStream(wrapper.getSocket().longValue()));
 
-        this.socket = wrapper.getSocket().longValue();
+        Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT);
     }
 
 
-    /*
-     * Output methods
-     */
-    @Override
-    public void flush() throws IOException {
-        // NOOP
-    }
+    // ----------------------------------------------------------- Inner classes
 
+    private static class AprUpgradeServletInputStream
+            extends UpgradeServletInputStream {
 
-    @Override
-    public void write(int b) throws IOException {
-        Socket.send(socket, new byte[] {(byte) b}, 0, 1);
-    }
+        private final long socket;
 
+        public AprUpgradeServletInputStream(long socket) {
+            this.socket = socket;
+        }
 
-    @Override
-    public void write(byte[]b, int off, int len) throws IOException {
-        Socket.send(socket, b, off, len);
+        @Override
+        protected int doRead() throws IOException {
+            byte[] bytes = new byte[1];
+            int result = Socket.recv(socket, bytes, 0, 1);
+            if (result == -1) {
+                return -1;
+            } else {
+                return bytes[0] & 0xFF;
+            }
+        }
+
+        @Override
+        protected int doRead(byte[] b, int off, int len) throws IOException {
+            boolean block = true;
+            if (!block) {
+                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
+            }
+            try {
+                int result = Socket.recv(socket, b, off, len);
+                if (result > 0) {
+                    return result;
+                } else if (-result == Status.EAGAIN) {
+                    return 0;
+                } else {
+                    throw new IOException(sm.getString("apr.error",
+                            Integer.valueOf(-result)));
+                }
+            } finally {
+                if (!block) {
+                    Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+                }
+            }
+        }
     }
 
+    private static class AprUpgradeServletOutputStream
+            extends UpgradeServletOutputStream {
+
+        private final long socket;
 
-    /*
-     * Input methods
-     */
-    @Override
-    public int read() throws IOException {
-        byte[] bytes = new byte[1];
-        int result = Socket.recv(socket, bytes, 0, 1);
-        if (result == -1) {
-            return -1;
-        } else {
-            return bytes[0] & 0xFF;
+        public AprUpgradeServletOutputStream(long socket) {
+            this.socket = socket;
         }
-    }
 
+        @Override
+        protected void doWrite(int b) throws IOException {
+            Socket.send(socket, new byte[] {(byte) b}, 0, 1);
+        }
 
-    @Override
-    public int read(boolean block, byte[] bytes, int off, int len)
-            throws IOException {
-        if (!block) {
-            Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
-        }
-        try {
-            int result = Socket.recv(socket, bytes, off, len);
-            if (result > 0) {
-                return result;
-            } else if (-result == Status.EAGAIN) {
-                return 0;
-            } else {
-                throw new IOException(sm.getString("apr.error",
-                        Integer.valueOf(-result)));
-            }
-        } finally {
-            if (!block) {
-                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
-            }
+        @Override
+        protected void doWrite(byte[] b, int off, int len) throws IOException {
+            Socket.send(socket, b, off, len);
+        }
+
+        @Override
+        protected void doFlush() throws IOException {
+            // NO-OP
         }
     }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java?rev=1413212&r1=1413211&r2=1413212&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java Sat Nov 24 17:35:46 2012
@@ -30,204 +30,219 @@ import org.apache.tomcat.util.net.Socket
 
 public class UpgradeNioProcessor extends UpgradeProcessor<NioChannel> {
 
-    private final NioChannel nioChannel;
-    private final NioSelectorPool pool;
-    private final int maxRead;
-    private final int maxWrite;
+    private static final int INFINITE_TIMEOUT = -1;
 
     public UpgradeNioProcessor(SocketWrapper<NioChannel> wrapper,
             ProtocolHandler httpUpgradeProcessor, NioSelectorPool pool) {
-        super(upgradeInbound);
+        super(httpUpgradeProcessor,
+                new NioUpgradeServletInputStream(wrapper, pool),
+                new NioUpgradeServletOutputStream(wrapper, pool));
 
-        wrapper.setTimeout(upgradeInbound.getReadTimeout());
-
-        this.nioChannel = wrapper.getSocket();
-        this.pool = pool;
-        this.maxRead = nioChannel.getBufHandler().getReadBuffer().capacity();
-        this.maxWrite = nioChannel.getBufHandler().getWriteBuffer().capacity();
+        wrapper.setTimeout(INFINITE_TIMEOUT);
     }
 
 
-    /*
-     * Output methods
-     */
-    @Override
-    public void flush() throws IOException {
-        NioEndpoint.KeyAttachment att =
-                (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
-        if (att == null) {
-            throw new IOException("Key must be cancelled");
-        }
-        long writeTimeout = att.getTimeout();
-        Selector selector = null;
-        try {
-            selector = pool.get();
-        } catch ( IOException x ) {
-            //ignore
-        }
-        try {
-            do {
-                if (nioChannel.flush(true, selector, writeTimeout)) {
-                    break;
-                }
-            } while (true);
-        } finally {
-            if (selector != null) {
-                pool.put(selector);
+    // ----------------------------------------------------------- Inner classes
+
+    private static class NioUpgradeServletInputStream
+            extends UpgradeServletInputStream {
+
+        private final NioChannel nioChannel;
+        private final NioSelectorPool pool;
+        private final int maxRead;
+
+        public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper,
+                NioSelectorPool pool) {
+            nioChannel = wrapper.getSocket();
+            this.pool = pool;
+            maxRead = nioChannel.getBufHandler().getReadBuffer().capacity();
+        }
+
+        @Override
+        protected int doRead() throws IOException {
+            byte[] bytes = new byte[1];
+            int result = readSocket(true, bytes, 0, 1);
+            if (result == -1) {
+                return -1;
+            } else {
+                return bytes[0] & 0xFF;
             }
         }
-    }
 
-    @Override
-    public void write(int b) throws IOException {
-        writeToSocket(new byte[] {(byte) b}, 0, 1);
-    }
+        @Override
+        protected int doRead(byte[] b, int off, int len) throws IOException {
+            if (len > maxRead) {
+                return readSocket(true, b, off, maxRead);
+            } else {
+                return readSocket(true, b, off, len);
+            }
+        }
+
+        private int readSocket(boolean block, byte[] b, int off, int len)
+                throws IOException {
+
+            ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer();
+            int remaining = readBuffer.remaining();
+
+            // Is there enough data in the read buffer to satisfy this request?
+            if (remaining >= len) {
+                readBuffer.get(b, off, len);
+                return len;
+            }
+
+            // Copy what data there is in the read buffer to the byte array
+            int leftToWrite = len;
+            int newOffset = off;
+            if (remaining > 0) {
+                readBuffer.get(b, off, remaining);
+                leftToWrite -= remaining;
+                newOffset += remaining;
+            }
+
+            // Fill the read buffer as best we can
+            readBuffer.clear();
+            int nRead = fillReadBuffer(block);
+
+            // Full as much of the remaining byte array as possible with the data
+            // that was just read
+            if (nRead > 0) {
+                readBuffer.flip();
+                readBuffer.limit(nRead);
+                if (nRead > leftToWrite) {
+                    readBuffer.get(b, newOffset, leftToWrite);
+                    leftToWrite = 0;
+                } else {
+                    readBuffer.get(b, newOffset, nRead);
+                    leftToWrite -= nRead;
+                }
+            } else if (nRead == 0) {
+                readBuffer.flip();
+                readBuffer.limit(nRead);
+            } else if (nRead == -1) {
+                throw new EOFException(sm.getString("nio.eof.error"));
+            }
 
-    @Override
-    public void write(byte[]b, int off, int len) throws IOException {
-        int written = 0;
-        while (len - written > maxWrite) {
-            written += writeToSocket(b, off + written, maxWrite);
+            return len - leftToWrite;
         }
-        writeToSocket(b, off + written, len - written);
-    }
 
-    /*
-     * Input methods
-     */
-    @Override
-    public int read() throws IOException {
-        byte[] bytes = new byte[1];
-        int result = readSocket(true, bytes, 0, 1);
-        if (result == -1) {
-            return -1;
-        } else {
-            return bytes[0] & 0xFF;
+        private int fillReadBuffer(boolean block) throws IOException {
+            int nRead;
+            if (block) {
+                Selector selector = null;
+                try {
+                    selector = pool.get();
+                } catch ( IOException x ) {
+                    // Ignore
+                }
+                try {
+                    NioEndpoint.KeyAttachment att =
+                            (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
+                    if (att == null) {
+                        throw new IOException("Key must be cancelled.");
+                    }
+                    nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(),
+                            nioChannel, selector, att.getTimeout());
+                } catch (EOFException eof) {
+                    nRead = -1;
+                } finally {
+                    if (selector != null) {
+                        pool.put(selector);
+                    }
+                }
+            } else {
+                nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer());
+            }
+            return nRead;
         }
     }
 
-    @Override
-    public int read(boolean block, byte[] bytes, int off, int len)
-            throws IOException {
-        if (len > maxRead) {
-            return readSocket(block, bytes, off, maxRead);
-        } else {
-            return readSocket(block, bytes, off, len);
+    private static class NioUpgradeServletOutputStream
+            extends UpgradeServletOutputStream {
+
+        private final NioChannel nioChannel;
+        private final NioSelectorPool pool;
+        private final int maxWrite;
+
+        public NioUpgradeServletOutputStream(
+                SocketWrapper<NioChannel> wrapper, NioSelectorPool pool) {
+            nioChannel = wrapper.getSocket();
+            this.pool = pool;
+            maxWrite = nioChannel.getBufHandler().getWriteBuffer().capacity();
         }
-    }
 
+        @Override
+        protected void doWrite(int b) throws IOException {
+            writeToSocket(new byte[] {(byte) b}, 0, 1);
+        }
 
-    /*
-     * Adapted from the NioInputBuffer.
-     */
-    private int readSocket(boolean block, byte[] bytes, int offset, int len)
-            throws IOException {
-
-        ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer();
-        int remaining = readBuffer.remaining();
-
-        // Is there enough data in the read buffer to satisfy this request?
-        if (remaining >= len) {
-            readBuffer.get(bytes, offset, len);
-            return len;
-        }
-
-        // Copy what data there is in the read buffer to the byte array
-        int leftToWrite = len;
-        int newOffset = offset;
-        if (remaining > 0) {
-            readBuffer.get(bytes, offset, remaining);
-            leftToWrite -= remaining;
-            newOffset += remaining;
-        }
-
-        // Fill the read buffer as best we can
-        readBuffer.clear();
-        int nRead = fillReadBuffer(block);
-
-        // Full as much of the remaining byte array as possible with the data
-        // that was just read
-        if (nRead > 0) {
-            readBuffer.flip();
-            readBuffer.limit(nRead);
-            if (nRead > leftToWrite) {
-                readBuffer.get(bytes, newOffset, leftToWrite);
-                leftToWrite = 0;
-            } else {
-                readBuffer.get(bytes, newOffset, nRead);
-                leftToWrite -= nRead;
+        @Override
+        protected void doWrite(byte[] b, int off, int len) throws IOException {
+            int written = 0;
+            while (len - written > maxWrite) {
+                written += writeToSocket(b, off + written, maxWrite);
             }
-        } else if (nRead == 0) {
-            readBuffer.flip();
-            readBuffer.limit(nRead);
-        } else if (nRead == -1) {
-            throw new EOFException(sm.getString("nio.eof.error"));
+            writeToSocket(b, off + written, len - written);
         }
 
-        return len - leftToWrite;
-    }
-
-    private int fillReadBuffer(boolean block) throws IOException {
-        int nRead;
-        if (block) {
+        @Override
+        protected void doFlush() throws IOException {
+            NioEndpoint.KeyAttachment att =
+                    (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
+            if (att == null) {
+                throw new IOException("Key must be cancelled");
+            }
+            long writeTimeout = att.getTimeout();
             Selector selector = null;
             try {
                 selector = pool.get();
             } catch ( IOException x ) {
-                // Ignore
+                //ignore
             }
             try {
-                NioEndpoint.KeyAttachment att =
-                        (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
-                if (att == null) {
-                    throw new IOException("Key must be cancelled.");
-                }
-                nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(),
-                        nioChannel, selector, att.getTimeout());
-            } catch (EOFException eof) {
-                nRead = -1;
+                do {
+                    if (nioChannel.flush(true, selector, writeTimeout)) {
+                        break;
+                    }
+                } while (true);
             } finally {
                 if (selector != null) {
                     pool.put(selector);
                 }
             }
-        } else {
-            nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer());
         }
-        return nRead;
-    }
 
-    /*
-     * Adapted from the NioOutputBuffer
-     */
-    private synchronized int writeToSocket(byte[] bytes, int off, int len)
-            throws IOException {
-
-        nioChannel.getBufHandler().getWriteBuffer().clear();
-        nioChannel.getBufHandler().getWriteBuffer().put(bytes, off, len);
-        nioChannel.getBufHandler().getWriteBuffer().flip();
-
-        int written = 0;
-        NioEndpoint.KeyAttachment att =
-                (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
-        if (att == null) {
-            throw new IOException("Key must be cancelled");
-        }
-        long writeTimeout = att.getTimeout();
-        Selector selector = null;
-        try {
-            selector = pool.get();
-        } catch ( IOException x ) {
-            //ignore
-        }
-        try {
-            written = pool.write(nioChannel.getBufHandler().getWriteBuffer(),
-                    nioChannel, selector, writeTimeout, true);
-        } finally {
-            if (selector != null) {
-                pool.put(selector);
+        /*
+         * Adapted from the NioOutputBuffer
+         */
+        private synchronized int writeToSocket(byte[] bytes, int off, int len)
+                throws IOException {
+
+            nioChannel.getBufHandler().getWriteBuffer().clear();
+            nioChannel.getBufHandler().getWriteBuffer().put(bytes, off, len);
+            nioChannel.getBufHandler().getWriteBuffer().flip();
+
+            int written = 0;
+            NioEndpoint.KeyAttachment att =
+                    (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
+            if (att == null) {
+                throw new IOException("Key must be cancelled");
+            }
+            long writeTimeout = att.getTimeout();
+            Selector selector = null;
+            try {
+                selector = pool.get();
+            } catch ( IOException x ) {
+                //ignore
+            }
+            try {
+                written = pool.write(nioChannel.getBufHandler().getWriteBuffer(),
+                        nioChannel, selector, writeTimeout, true);
+            } finally {
+                if (selector != null) {
+                    pool.put(selector);
+                }
             }
+            return written;
         }
-        return written;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org