You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2012/12/24 12:17:52 UTC

svn commit: r1425633 - /tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Author: markt
Date: Mon Dec 24 11:17:51 2012
New Revision: 1425633

URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
Log:
Allow multiple threads to process an upgraded connection at the same time.
This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.

Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24 11:17:51 2012
@@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac
             this.status = status;
         }
 
-        @SuppressWarnings("null") // key
         @Override
         public void run() {
+            SelectionKey key = socket.getIOChannel().keyFor(
+                    socket.getPoller().getSelector());
+            KeyAttachment ka = null;
+
+            if (key != null) {
+                ka = (KeyAttachment)key.attachment();
+            }
+
+            // Upgraded connections need to allow multiple threads to access the
+            // connection at the same time to enable blocking IO to be used when
+            // NIO has been configured
+            if (ka != null && ka.isUpgraded()) {
+                doRun(key, ka);
+            } else {
+                synchronized (socket) {
+                    doRun(key, ka);
+                }
+            }
+        }
+
+        private void doRun(SelectionKey key, KeyAttachment ka) {
             boolean launch = false;
-            synchronized (socket) {
-                SelectionKey key = null;
-                try {
-                    key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                    int handshake = -1;
+            try {
+                int handshake = -1;
 
-                    try {
-                        if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
-                    }catch ( IOException x ) {
-                        handshake = -1;
-                        if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
-                    }catch ( CancelledKeyException ckx ) {
-                        handshake = -1;
-                    }
-                    if ( handshake == 0 ) {
-                        SocketState state = SocketState.OPEN;
-                        // Process the request from this socket
-                        // Suppress null warnings for key in this block since
-                        // key can't be null in this block
-                        KeyAttachment ka = (KeyAttachment)key.attachment();
-                        if (status == null) {
-                            state = handler.process(ka, SocketStatus.OPEN_READ);
-                        } else {
-                            state = handler.process(ka, status);
-                        }
-                        if (state == SocketState.CLOSED) {
-                            // Close socket and pool
-                            try {
-                                if (ka!=null) ka.setComet(false);
-                                socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
-                                if (running && !paused) {
-                                    nioChannels.push(socket);
-                                }
-                                socket = null;
-                                if (running && !paused && ka != null) {
-                                    keyCache.push(ka);
-                                }
-                                ka = null;
-                            }catch ( Exception x ) {
-                                log.error("",x);
-                            }
-                        } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
-                            //we are async, and we are interested in operations
-                            ka.getPoller().add(socket, ka.interestOps());
-                        }
-                    } else if (handshake == -1 ) {
-                        KeyAttachment ka = null;
-                        if (key!=null) {
-                            ka = (KeyAttachment) key.attachment();
-                            socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
-                        }
-                        if (running && !paused) {
-                            nioChannels.push(socket);
-                        }
-                        socket = null;
-                        if (running && !paused && ka != null) {
-                            keyCache.push(ka);
-                        }
-                        ka = null;
+                try {
+                    if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
+                }catch ( IOException x ) {
+                    handshake = -1;
+                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
+                }catch ( CancelledKeyException ckx ) {
+                    handshake = -1;
+                }
+                if ( handshake == 0 ) {
+                    SocketState state = SocketState.OPEN;
+                    // Process the request from this socket
+                    // Suppress null warnings for key in this block since
+                    // key can't be null in this block
+                    if (status == null) {
+                        state = handler.process(ka, SocketStatus.OPEN_READ);
                     } else {
-                        final SelectionKey fk = key;
-                        final int intops = handshake;
-                        final KeyAttachment ka = (KeyAttachment)fk.attachment();
-                        ka.getPoller().add(socket,intops);
-                    }
-                }catch(CancelledKeyException cx) {
-                    socket.getPoller().cancelledKey(key,null);
-                } catch (OutOfMemoryError oom) {
-                    try {
-                        oomParachuteData = null;
-                        log.error("", oom);
-                        if (socket != null) {
-                            socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
-                        }
-                        releaseCaches();
-                    }catch ( Throwable oomt ) {
+                        state = handler.process(ka, status);
+                    }
+                    if (state == SocketState.CLOSED) {
+                        // Close socket and pool
                         try {
-                            System.err.println(oomParachuteMsg);
-                            oomt.printStackTrace();
-                        }catch (Throwable letsHopeWeDontGetHere){
-                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
-                        }
+                            if (ka!=null) ka.setComet(false);
+                            socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
+                            if (running && !paused) {
+                                nioChannels.push(socket);
+                            }
+                            socket = null;
+                            if (running && !paused && ka != null) {
+                                keyCache.push(ka);
+                            }
+                            ka = null;
+                        }catch ( Exception x ) {
+                            log.error("",x);
+                        }
+                    } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
+                        //we are async, and we are interested in operations
+                        ka.getPoller().add(socket, ka.interestOps());
+                    }
+                } else if (handshake == -1 ) {
+                    if (key != null) {
+                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
+                    }
+                    if (running && !paused) {
+                        nioChannels.push(socket);
                     }
-                }catch ( Throwable t ) {
-                    log.error("",t);
+                    socket = null;
+                    if (running && !paused && ka != null) {
+                        keyCache.push(ka);
+                    }
+                    ka = null;
+                } else {
+                    ka.getPoller().add(socket,handshake);
+                }
+            }catch(CancelledKeyException cx) {
+                socket.getPoller().cancelledKey(key,null);
+            } catch (OutOfMemoryError oom) {
+                try {
+                    oomParachuteData = null;
+                    log.error("", oom);
                     if (socket != null) {
                         socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
                     }
-                } finally {
-                    if (launch) {
-                        try {
-                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
-                        } catch (NullPointerException npe) {
-                            if (running) {
-                                log.error(sm.getString("endpoint.launch.fail"),
-                                        npe);
-                            }
-                        }
+                    releaseCaches();
+                }catch ( Throwable oomt ) {
+                    try {
+                        System.err.println(oomParachuteMsg);
+                        oomt.printStackTrace();
+                    }catch (Throwable letsHopeWeDontGetHere){
+                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                     }
-                    socket = null;
-                    status = null;
-                    //return to cache
-                    if (running && !paused) {
-                        processorCache.push(this);
+                }
+            }catch ( Throwable t ) {
+                log.error("",t);
+                if (socket != null) {
+                    socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
+                }
+            } finally {
+                if (launch) {
+                    try {
+                        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+                    } catch (NullPointerException npe) {
+                        if (running) {
+                            log.error(sm.getString("endpoint.launch.fail"),
+                                    npe);
+                        }
                     }
                 }
+                socket = null;
+                status = null;
+                //return to cache
+                if (running && !paused) {
+                    processorCache.push(this);
+                }
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1425633 - /tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Posted by Mark Thomas <ma...@apache.org>.
On 30/12/2012 22:20, Mark Thomas wrote:
> On 24/12/2012 11:17, markt@apache.org wrote:
>> Author: markt
>> Date: Mon Dec 24 11:17:51 2012
>> New Revision: 1425633
>>
>> URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
>> Log:
>> Allow multiple threads to process an upgraded connection at the same time.
>> This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.
> 
> While this worked on Windows XP, it doesn't work on Linux which - from
> memory of running the Autobahn tests on the Tomcat 7.0.x web socket
> implementation - has a much better (i.e. faster) network implementation.
> 
> The issue is that the Servlet 3.1 non-blocking proposal requires
> multiple container managed threads to be using the socket at the same
> time. I think this is limited to two threads (one reading, one writing)
> but the specification does not explicitly state that so there may be a
> requirement for more. I will be starting a discussion in the EG on this
> point. Regardless, prior to this commit Tomcat enforced a one socket -
> one thread limit and that limit will need to be removed in Tomcat 8.
> 
> Some form of re-factoring will be required but before I embark any
> further down that road I want to take some time to review and fully
> understand the existing code. There are various parts that would benefit
> from some additional commentary and I suspect that a little clean-up
> will be possible as well.

During the refactoring, I found a handful of threading issues. Fixing
those appears to have fixed the Autobahn testsuite for Linux.

I still intend to continue with investigating the refactoring as I am
still not really happy with the above commit.

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1425633 - /tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Posted by Mark Thomas <ma...@apache.org>.
On 24/12/2012 11:17, markt@apache.org wrote:
> Author: markt
> Date: Mon Dec 24 11:17:51 2012
> New Revision: 1425633
> 
> URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
> Log:
> Allow multiple threads to process an upgraded connection at the same time.
> This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.

While this worked on Windows XP, it doesn't work on Linux which - from
memory of running the Autobahn tests on the Tomcat 7.0.x web socket
implementation - has a much better (i.e. faster) network implementation.

The issue is that the Servlet 3.1 non-blocking proposal requires
multiple container managed threads to be using the socket at the same
time. I think this is limited to two threads (one reading, one writing)
but the specification does not explicitly state that so there may be a
requirement for more. I will be starting a discussion in the EG on this
point. Regardless, prior to this commit Tomcat enforced a one socket -
one thread limit and that limit will need to be removed in Tomcat 8.

Some form of re-factoring will be required but before I embark any
further down that road I want to take some time to review and fully
understand the existing code. There are various parts that would benefit
from some additional commentary and I suspect that a little clean-up
will be possible as well.

Mark


> 
> Modified:
>     tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> 
> Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff
> ==============================================================================
> --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
> +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24 11:17:51 2012
> @@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac
>              this.status = status;
>          }
>  
> -        @SuppressWarnings("null") // key
>          @Override
>          public void run() {
> +            SelectionKey key = socket.getIOChannel().keyFor(
> +                    socket.getPoller().getSelector());
> +            KeyAttachment ka = null;
> +
> +            if (key != null) {
> +                ka = (KeyAttachment)key.attachment();
> +            }
> +
> +            // Upgraded connections need to allow multiple threads to access the
> +            // connection at the same time to enable blocking IO to be used when
> +            // NIO has been configured
> +            if (ka != null && ka.isUpgraded()) {
> +                doRun(key, ka);
> +            } else {
> +                synchronized (socket) {
> +                    doRun(key, ka);
> +                }
> +            }
> +        }
> +
> +        private void doRun(SelectionKey key, KeyAttachment ka) {
>              boolean launch = false;
> -            synchronized (socket) {
> -                SelectionKey key = null;
> -                try {
> -                    key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
> -                    int handshake = -1;
> +            try {
> +                int handshake = -1;
>  
> -                    try {
> -                        if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
> -                    }catch ( IOException x ) {
> -                        handshake = -1;
> -                        if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
> -                    }catch ( CancelledKeyException ckx ) {
> -                        handshake = -1;
> -                    }
> -                    if ( handshake == 0 ) {
> -                        SocketState state = SocketState.OPEN;
> -                        // Process the request from this socket
> -                        // Suppress null warnings for key in this block since
> -                        // key can't be null in this block
> -                        KeyAttachment ka = (KeyAttachment)key.attachment();
> -                        if (status == null) {
> -                            state = handler.process(ka, SocketStatus.OPEN_READ);
> -                        } else {
> -                            state = handler.process(ka, status);
> -                        }
> -                        if (state == SocketState.CLOSED) {
> -                            // Close socket and pool
> -                            try {
> -                                if (ka!=null) ka.setComet(false);
> -                                socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
> -                                if (running && !paused) {
> -                                    nioChannels.push(socket);
> -                                }
> -                                socket = null;
> -                                if (running && !paused && ka != null) {
> -                                    keyCache.push(ka);
> -                                }
> -                                ka = null;
> -                            }catch ( Exception x ) {
> -                                log.error("",x);
> -                            }
> -                        } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
> -                            //we are async, and we are interested in operations
> -                            ka.getPoller().add(socket, ka.interestOps());
> -                        }
> -                    } else if (handshake == -1 ) {
> -                        KeyAttachment ka = null;
> -                        if (key!=null) {
> -                            ka = (KeyAttachment) key.attachment();
> -                            socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
> -                        }
> -                        if (running && !paused) {
> -                            nioChannels.push(socket);
> -                        }
> -                        socket = null;
> -                        if (running && !paused && ka != null) {
> -                            keyCache.push(ka);
> -                        }
> -                        ka = null;
> +                try {
> +                    if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
> +                }catch ( IOException x ) {
> +                    handshake = -1;
> +                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
> +                }catch ( CancelledKeyException ckx ) {
> +                    handshake = -1;
> +                }
> +                if ( handshake == 0 ) {
> +                    SocketState state = SocketState.OPEN;
> +                    // Process the request from this socket
> +                    // Suppress null warnings for key in this block since
> +                    // key can't be null in this block
> +                    if (status == null) {
> +                        state = handler.process(ka, SocketStatus.OPEN_READ);
>                      } else {
> -                        final SelectionKey fk = key;
> -                        final int intops = handshake;
> -                        final KeyAttachment ka = (KeyAttachment)fk.attachment();
> -                        ka.getPoller().add(socket,intops);
> -                    }
> -                }catch(CancelledKeyException cx) {
> -                    socket.getPoller().cancelledKey(key,null);
> -                } catch (OutOfMemoryError oom) {
> -                    try {
> -                        oomParachuteData = null;
> -                        log.error("", oom);
> -                        if (socket != null) {
> -                            socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
> -                        }
> -                        releaseCaches();
> -                    }catch ( Throwable oomt ) {
> +                        state = handler.process(ka, status);
> +                    }
> +                    if (state == SocketState.CLOSED) {
> +                        // Close socket and pool
>                          try {
> -                            System.err.println(oomParachuteMsg);
> -                            oomt.printStackTrace();
> -                        }catch (Throwable letsHopeWeDontGetHere){
> -                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
> -                        }
> +                            if (ka!=null) ka.setComet(false);
> +                            socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
> +                            if (running && !paused) {
> +                                nioChannels.push(socket);
> +                            }
> +                            socket = null;
> +                            if (running && !paused && ka != null) {
> +                                keyCache.push(ka);
> +                            }
> +                            ka = null;
> +                        }catch ( Exception x ) {
> +                            log.error("",x);
> +                        }
> +                    } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
> +                        //we are async, and we are interested in operations
> +                        ka.getPoller().add(socket, ka.interestOps());
> +                    }
> +                } else if (handshake == -1 ) {
> +                    if (key != null) {
> +                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
> +                    }
> +                    if (running && !paused) {
> +                        nioChannels.push(socket);
>                      }
> -                }catch ( Throwable t ) {
> -                    log.error("",t);
> +                    socket = null;
> +                    if (running && !paused && ka != null) {
> +                        keyCache.push(ka);
> +                    }
> +                    ka = null;
> +                } else {
> +                    ka.getPoller().add(socket,handshake);
> +                }
> +            }catch(CancelledKeyException cx) {
> +                socket.getPoller().cancelledKey(key,null);
> +            } catch (OutOfMemoryError oom) {
> +                try {
> +                    oomParachuteData = null;
> +                    log.error("", oom);
>                      if (socket != null) {
>                          socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
>                      }
> -                } finally {
> -                    if (launch) {
> -                        try {
> -                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
> -                        } catch (NullPointerException npe) {
> -                            if (running) {
> -                                log.error(sm.getString("endpoint.launch.fail"),
> -                                        npe);
> -                            }
> -                        }
> +                    releaseCaches();
> +                }catch ( Throwable oomt ) {
> +                    try {
> +                        System.err.println(oomParachuteMsg);
> +                        oomt.printStackTrace();
> +                    }catch (Throwable letsHopeWeDontGetHere){
> +                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
>                      }
> -                    socket = null;
> -                    status = null;
> -                    //return to cache
> -                    if (running && !paused) {
> -                        processorCache.push(this);
> +                }
> +            }catch ( Throwable t ) {
> +                log.error("",t);
> +                if (socket != null) {
> +                    socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
> +                }
> +            } finally {
> +                if (launch) {
> +                    try {
> +                        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
> +                    } catch (NullPointerException npe) {
> +                        if (running) {
> +                            log.error(sm.getString("endpoint.launch.fail"),
> +                                    npe);
> +                        }
>                      }
>                  }
> +                socket = null;
> +                status = null;
> +                //return to cache
> +                if (running && !paused) {
> +                    processorCache.push(this);
> +                }
>              }
>          }
>      }
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org