You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/08/22 12:11:27 UTC

svn commit: r1516407 - /tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Author: markt
Date: Thu Aug 22 10:11:27 2013
New Revision: 1516407

URL: http://svn.apache.org/r1516407
Log:
Back-port NIO endpoint changes required to support concurrent read/write for JSR-356 upgraded connections

Modified:
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1516407&r1=1516406&r2=1516407&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Aug 22 10:11:27 2013
@@ -1260,7 +1260,7 @@ public class NioEndpoint extends Abstrac
                                         processSocket(channel, SocketStatus.DISCONNECT, true);
                                 } else {
                                     //future placement of a WRITE notif
-                                    if (!processSocket(channel, SocketStatus.OPEN_READ, true))
+                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true))
                                         processSocket(channel, SocketStatus.DISCONNECT, true);
                                 }
                             } else {
@@ -1269,8 +1269,23 @@ public class NioEndpoint extends Abstrac
                         } else {
                             //later on, improve latch behavior
                             if ( isWorkerAvailable() ) {
-                                unreg(sk, attachment,sk.readyOps());
-                                boolean close = (!processSocket(channel, null, true));
+
+                                boolean readAndWrite = sk.isReadable() && sk.isWritable();
+                                reg(sk, attachment, 0);
+                                if (attachment.isAsync() && readAndWrite) {
+                                    //remember the that we want to know about write too
+                                    attachment.interestOps(SelectionKey.OP_WRITE);
+                                }
+                                //read goes before write
+                                if (sk.isReadable()) {
+                                    //read notification
+                                    if (!processSocket(channel, SocketStatus.OPEN_READ, true))
+                                        close = true;
+                                } else {
+                                    //future placement of a WRITE notif
+                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true))
+                                        close = true;
+                                }
                                 if (close) {
                                     cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                 }
@@ -1319,7 +1334,9 @@ public class NioEndpoint extends Abstrac
                         cancelledKey(sk,SocketStatus.ERROR,false);
                         return false;
                     }
-                    sd.fchannel = new FileInputStream(f).getChannel();
+                    @SuppressWarnings("resource") // Closed when channel is closed
+                    FileInputStream fis = new FileInputStream(f);
+                    sd.fchannel = fis.getChannel();
                 }
 
                 //configure output channel
@@ -1670,104 +1687,136 @@ public class NioEndpoint extends Abstrac
 
         @Override
         public void run() {
+            SelectionKey key = socket.getIOChannel().keyFor(
+                    socket.getPoller().getSelector());
+            KeyAttachment ka = null;
+
+            if (key != null) {
+                ka = (KeyAttachment)key.attachment();
+            }
+
+            // Upgraded connections need to allow multiple threads to access the
+            // connection at the same time to enable blocking IO to be used when
+            // NIO has been configured
+            if (ka != null && ka.isUpgraded() &&
+                    SocketStatus.OPEN_WRITE == status) {
+                synchronized (ka.getWriteThreadLock()) {
+                    doRun(key, ka);
+                }
+            } else {
+                synchronized (socket) {
+                    doRun(key, ka);
+                }
+            }
+        }
+
+        private void doRun(SelectionKey key, KeyAttachment ka) {
             boolean launch = false;
-            synchronized (socket) {
-                SelectionKey key = null;
-                try {
-                    key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                    int handshake = -1;
+            try {
+                int handshake = -1;
 
-                    try {
-                        if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
-                    }catch ( IOException x ) {
-                        handshake = -1;
-                        if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
-                    }catch ( CancelledKeyException ckx ) {
-                        handshake = -1;
-                    }
-                    if ( handshake == 0 ) {
-                        SocketState state = SocketState.OPEN;
-                        // Process the request from this socket
-                        if (status == null) {
-                            state = handler.process(
-                                    (KeyAttachment) key.attachment(),
-                                    SocketStatus.OPEN_READ);
+                try {
+                    if (key != null) {
+                        // For STOP there is no point trying to handshake as the
+                        // Poller has been stopped.
+                        if (socket.isHandshakeComplete() ||
+                                status == SocketStatus.STOP) {
+                            handshake = 0;
                         } else {
-                            state = handler.process(
-                                    (KeyAttachment) key.attachment(),
-                                    status);
+                            handshake = socket.handshake(
+                                    key.isReadable(), key.isWritable());
+                            // The handshake process reads/writes from/to the
+                            // socket. status may therefore be OPEN_WRITE once
+                            // the handshake completes. However, the handshake
+                            // happens when the socket is opened so the status
+                            // must always be OPEN_READ after it completes. It
+                            // is OK to always set this as it is only used if
+                            // the handshake completes.
+                            status = SocketStatus.OPEN_READ;
                         }
-
-                        if (state == SocketState.CLOSED) {
-                            // Close socket and pool
-                            try {
-                                KeyAttachment ka = null;
-                                if (key!=null) {
-                                    ka = (KeyAttachment) key.attachment();
-                                    if (ka!=null) ka.setComet(false);
-                                    socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
-                                }
+                    }
+                }catch ( IOException x ) {
+                    handshake = -1;
+                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
+                }catch ( CancelledKeyException ckx ) {
+                    handshake = -1;
+                }
+                if ( handshake == 0 ) {
+                    SocketState state = SocketState.OPEN;
+                    // Process the request from this socket
+                    if (status == null) {
+                        state = handler.process(ka, SocketStatus.OPEN_READ);
+                    } else {
+                        state = handler.process(ka, status);
+                    }
+                    if (state == SocketState.CLOSED) {
+                        // Close socket and pool
+                        try {
+                            if (ka!=null) ka.setComet(false);
+                            socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
+                            if (running && !paused) {
                                 nioChannels.offer(socket);
-                                socket = null;
-                                if ( ka!=null ) keyCache.offer(ka);
-                                ka = null;
-                            }catch ( Exception x ) {
-                                log.error("",x);
                             }
+                            socket = null;
+                            if (running && !paused && ka!=null) {
+                                keyCache.offer(ka);
+                            }
+                            ka = null;
+                        } catch ( Exception x ) {
+                            log.error("",x);
                         }
-                    } else if (handshake == -1 ) {
-                        KeyAttachment ka = null;
-                        if (key!=null) {
-                            ka = (KeyAttachment) key.attachment();
-                            socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
-                        }
-                        nioChannels.offer(socket);
-                        socket = null;
-                        if ( ka!=null ) keyCache.offer(ka);
-                        ka = null;
-                    } else {
-                        final SelectionKey fk = key;
-                        final int intops = handshake;
-                        final KeyAttachment ka = (KeyAttachment)fk.attachment();
-                        ka.getPoller().add(socket,intops);
-                    }
-                }catch(CancelledKeyException cx) {
-                    socket.getPoller().cancelledKey(key,null,false);
-                } catch (OutOfMemoryError oom) {
+                    }
+                } else if (handshake == -1 ) {
+                    if (key != null) {
+                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
+                    }
+                    nioChannels.offer(socket);
+                    socket = null;
+                    if ( ka!=null ) keyCache.offer(ka);
+                    ka = null;
+                } else {
+                    ka.getPoller().add(socket, handshake);
+                }
+            }catch(CancelledKeyException cx) {
+                socket.getPoller().cancelledKey(key,null,false);
+            } catch (OutOfMemoryError oom) {
+                try {
+                    oomParachuteData = null;
+                    log.error("", oom);
+                    if (socket != null) {
+                        socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
+                    }
+                    releaseCaches();
+                }catch ( Throwable oomt ) {
                     try {
-                        oomParachuteData = null;
-                        log.error("", oom);
-                        if (socket != null) {
-                            socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
-                        }
-                        releaseCaches();
-                    }catch ( Throwable oomt ) {
-                        try {
-                            System.err.println(oomParachuteMsg);
-                            oomt.printStackTrace();
-                        }catch (Throwable letsHopeWeDontGetHere){
-                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
-                        }
+                        System.err.println(oomParachuteMsg);
+                        oomt.printStackTrace();
+                    }catch (Throwable letsHopeWeDontGetHere){
+                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                     }
-                } catch (VirtualMachineError vme) {
-                    ExceptionUtils.handleThrowable(vme);
-                }catch ( Throwable t ) {
-                    log.error("",t);
+                }
+            } catch (VirtualMachineError vme) {
+                ExceptionUtils.handleThrowable(vme);
+            }catch ( Throwable t ) {
+                log.error("",t);
+                if (socket != null) {
                     socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
-                } finally {
-                    if (launch) {
-                        try {
-                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
-                        } catch (NullPointerException npe) {
-                            if (running) {
-                                log.error(sm.getString("endpoint.launch.fail"),
-                                        npe);
-                            }
+                }
+            } finally {
+                if (launch) {
+                    try {
+                        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+                    } catch (NullPointerException npe) {
+                        if (running) {
+                            log.error(sm.getString("endpoint.launch.fail"),
+                                    npe);
                         }
                     }
-                    socket = null;
-                    status = null;
-                    //return to cache
+                }
+                socket = null;
+                status = null;
+                //return to cache
+                if (running && !paused) {
                     processorCache.offer(this);
                 }
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org