You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2012/12/24 12:17:52 UTC
svn commit: r1425633 -
/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Author: markt
Date: Mon Dec 24 11:17:51 2012
New Revision: 1425633
URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
Log:
Allow multiple threads to process an upgraded connection at the same time.
This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24 11:17:51 2012
@@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac
this.status = status;
}
- @SuppressWarnings("null") // key
@Override
public void run() {
+ SelectionKey key = socket.getIOChannel().keyFor(
+ socket.getPoller().getSelector());
+ KeyAttachment ka = null;
+
+ if (key != null) {
+ ka = (KeyAttachment)key.attachment();
+ }
+
+ // Upgraded connections need to allow multiple threads to access the
+ // connection at the same time to enable blocking IO to be used when
+ // NIO has been configured
+ if (ka != null && ka.isUpgraded()) {
+ doRun(key, ka);
+ } else {
+ synchronized (socket) {
+ doRun(key, ka);
+ }
+ }
+ }
+
+ private void doRun(SelectionKey key, KeyAttachment ka) {
boolean launch = false;
- synchronized (socket) {
- SelectionKey key = null;
- try {
- key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- int handshake = -1;
+ try {
+ int handshake = -1;
- try {
- if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
- }catch ( IOException x ) {
- handshake = -1;
- if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
- }catch ( CancelledKeyException ckx ) {
- handshake = -1;
- }
- if ( handshake == 0 ) {
- SocketState state = SocketState.OPEN;
- // Process the request from this socket
- // Suppress null warnings for key in this block since
- // key can't be null in this block
- KeyAttachment ka = (KeyAttachment)key.attachment();
- if (status == null) {
- state = handler.process(ka, SocketStatus.OPEN_READ);
- } else {
- state = handler.process(ka, status);
- }
- if (state == SocketState.CLOSED) {
- // Close socket and pool
- try {
- if (ka!=null) ka.setComet(false);
- socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
- if (running && !paused) {
- nioChannels.push(socket);
- }
- socket = null;
- if (running && !paused && ka != null) {
- keyCache.push(ka);
- }
- ka = null;
- }catch ( Exception x ) {
- log.error("",x);
- }
- } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
- //we are async, and we are interested in operations
- ka.getPoller().add(socket, ka.interestOps());
- }
- } else if (handshake == -1 ) {
- KeyAttachment ka = null;
- if (key!=null) {
- ka = (KeyAttachment) key.attachment();
- socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
- }
- if (running && !paused) {
- nioChannels.push(socket);
- }
- socket = null;
- if (running && !paused && ka != null) {
- keyCache.push(ka);
- }
- ka = null;
+ try {
+ if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
+ }catch ( IOException x ) {
+ handshake = -1;
+ if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
+ }catch ( CancelledKeyException ckx ) {
+ handshake = -1;
+ }
+ if ( handshake == 0 ) {
+ SocketState state = SocketState.OPEN;
+ // Process the request from this socket
+ // Suppress null warnings for key in this block since
+ // key can't be null in this block
+ if (status == null) {
+ state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
- final SelectionKey fk = key;
- final int intops = handshake;
- final KeyAttachment ka = (KeyAttachment)fk.attachment();
- ka.getPoller().add(socket,intops);
- }
- }catch(CancelledKeyException cx) {
- socket.getPoller().cancelledKey(key,null);
- } catch (OutOfMemoryError oom) {
- try {
- oomParachuteData = null;
- log.error("", oom);
- if (socket != null) {
- socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
- }
- releaseCaches();
- }catch ( Throwable oomt ) {
+ state = handler.process(ka, status);
+ }
+ if (state == SocketState.CLOSED) {
+ // Close socket and pool
try {
- System.err.println(oomParachuteMsg);
- oomt.printStackTrace();
- }catch (Throwable letsHopeWeDontGetHere){
- ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
- }
+ if (ka!=null) ka.setComet(false);
+ socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
+ if (running && !paused) {
+ nioChannels.push(socket);
+ }
+ socket = null;
+ if (running && !paused && ka != null) {
+ keyCache.push(ka);
+ }
+ ka = null;
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
+ } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
+ //we are async, and we are interested in operations
+ ka.getPoller().add(socket, ka.interestOps());
+ }
+ } else if (handshake == -1 ) {
+ if (key != null) {
+ socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
+ }
+ if (running && !paused) {
+ nioChannels.push(socket);
}
- }catch ( Throwable t ) {
- log.error("",t);
+ socket = null;
+ if (running && !paused && ka != null) {
+ keyCache.push(ka);
+ }
+ ka = null;
+ } else {
+ ka.getPoller().add(socket,handshake);
+ }
+ }catch(CancelledKeyException cx) {
+ socket.getPoller().cancelledKey(key,null);
+ } catch (OutOfMemoryError oom) {
+ try {
+ oomParachuteData = null;
+ log.error("", oom);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
}
- } finally {
- if (launch) {
- try {
- getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
- } catch (NullPointerException npe) {
- if (running) {
- log.error(sm.getString("endpoint.launch.fail"),
- npe);
- }
- }
+ releaseCaches();
+ }catch ( Throwable oomt ) {
+ try {
+ System.err.println(oomParachuteMsg);
+ oomt.printStackTrace();
+ }catch (Throwable letsHopeWeDontGetHere){
+ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
- socket = null;
- status = null;
- //return to cache
- if (running && !paused) {
- processorCache.push(this);
+ }
+ }catch ( Throwable t ) {
+ log.error("",t);
+ if (socket != null) {
+ socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
+ }
+ } finally {
+ if (launch) {
+ try {
+ getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.launch.fail"),
+ npe);
+ }
}
}
+ socket = null;
+ status = null;
+ //return to cache
+ if (running && !paused) {
+ processorCache.push(this);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org
Re: svn commit: r1425633 - /tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Posted by Mark Thomas <ma...@apache.org>.
On 30/12/2012 22:20, Mark Thomas wrote:
> On 24/12/2012 11:17, markt@apache.org wrote:
>> Author: markt
>> Date: Mon Dec 24 11:17:51 2012
>> New Revision: 1425633
>>
>> URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
>> Log:
>> Allow multiple threads to process an upgraded connection at the same time.
>> This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.
>
> While this worked on Windows XP, it doesn't work on Linux which - from
> memory of running the Autobahn tests on the Tomcat 7.0.x web socket
> implementation - has a much better (i.e. faster) network implementation.
>
> The issue is that the Servlet 3.1 non-blocking proposal requires
> multiple container managed threads to be using the socket at the same
> time. I think this is limited to two threads (one reading, one writing)
> but the specification does not explicitly state that so there may be a
> requirement for more. I will be starting a discussion in the EG on this
> point. Regardless, prior to this commit Tomcat enforced a one socket -
> one thread limit and that limit will need to be removed in Tomcat 8.
>
> Some form of re-factoring will be required but before I embark any
> further down that road I want to take some time to review and fully
> understand the existing code. There are various parts that would benefit
> from some additional commentary and I suspect that a little clean-up
> will be possible as well.
During the refactoring, I found a handful of threading issues. Fixing
those appears to have fixed the Autobahn testsuite for Linux.
I still intend to continue with investigating the refactoring as I am
still not really happy with the above commit.
Mark
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org
Re: svn commit: r1425633 - /tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Posted by Mark Thomas <ma...@apache.org>.
On 24/12/2012 11:17, markt@apache.org wrote:
> Author: markt
> Date: Mon Dec 24 11:17:51 2012
> New Revision: 1425633
>
> URL: http://svn.apache.org/viewvc?rev=1425633&view=rev
> Log:
> Allow multiple threads to process an upgraded connection at the same time.
> This is required to support blocking reads/writes. The Servlet[Input|Output]Streams do not expose an API for a blocking [read|write]. Therefore to block a thread has to wait until [onDataAvailable()|onWritePossible()] is called. The problem is that the waiting thread holds a lock on the socket and there is no way through the Servlet 3.1 API to release that lock. Until the lock is released the thread that will eventually call [onDataAvailable()|onWritePossible()] is blocked. So a form of deadlock occurs. To overcome this without requiring libraries such as WebSocket implementations to access container specific APIs, Tomcat has to allow multiple threads to process a upgraded connection at the same time.
While this worked on Windows XP, it doesn't work on Linux which - from
memory of running the Autobahn tests on the Tomcat 7.0.x web socket
implementation - has a much better (i.e. faster) network implementation.
The issue is that the Servlet 3.1 non-blocking proposal requires
multiple container managed threads to be using the socket at the same
time. I think this is limited to two threads (one reading, one writing)
but the specification does not explicitly state that so there may be a
requirement for more. I will be starting a discussion in the EG on this
point. Regardless, prior to this commit Tomcat enforced a one socket -
one thread limit and that limit will need to be removed in Tomcat 8.
Some form of re-factoring will be required but before I embark any
further down that road I want to take some time to review and fully
understand the existing code. There are various parts that would benefit
from some additional commentary and I suspect that a little clean-up
will be possible as well.
Mark
>
> Modified:
> tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
>
> Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
> URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1425633&r1=1425632&r2=1425633&view=diff
> ==============================================================================
> --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
> +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 24 11:17:51 2012
> @@ -1535,116 +1535,126 @@ public class NioEndpoint extends Abstrac
> this.status = status;
> }
>
> - @SuppressWarnings("null") // key
> @Override
> public void run() {
> + SelectionKey key = socket.getIOChannel().keyFor(
> + socket.getPoller().getSelector());
> + KeyAttachment ka = null;
> +
> + if (key != null) {
> + ka = (KeyAttachment)key.attachment();
> + }
> +
> + // Upgraded connections need to allow multiple threads to access the
> + // connection at the same time to enable blocking IO to be used when
> + // NIO has been configured
> + if (ka != null && ka.isUpgraded()) {
> + doRun(key, ka);
> + } else {
> + synchronized (socket) {
> + doRun(key, ka);
> + }
> + }
> + }
> +
> + private void doRun(SelectionKey key, KeyAttachment ka) {
> boolean launch = false;
> - synchronized (socket) {
> - SelectionKey key = null;
> - try {
> - key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
> - int handshake = -1;
> + try {
> + int handshake = -1;
>
> - try {
> - if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
> - }catch ( IOException x ) {
> - handshake = -1;
> - if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
> - }catch ( CancelledKeyException ckx ) {
> - handshake = -1;
> - }
> - if ( handshake == 0 ) {
> - SocketState state = SocketState.OPEN;
> - // Process the request from this socket
> - // Suppress null warnings for key in this block since
> - // key can't be null in this block
> - KeyAttachment ka = (KeyAttachment)key.attachment();
> - if (status == null) {
> - state = handler.process(ka, SocketStatus.OPEN_READ);
> - } else {
> - state = handler.process(ka, status);
> - }
> - if (state == SocketState.CLOSED) {
> - // Close socket and pool
> - try {
> - if (ka!=null) ka.setComet(false);
> - socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
> - if (running && !paused) {
> - nioChannels.push(socket);
> - }
> - socket = null;
> - if (running && !paused && ka != null) {
> - keyCache.push(ka);
> - }
> - ka = null;
> - }catch ( Exception x ) {
> - log.error("",x);
> - }
> - } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
> - //we are async, and we are interested in operations
> - ka.getPoller().add(socket, ka.interestOps());
> - }
> - } else if (handshake == -1 ) {
> - KeyAttachment ka = null;
> - if (key!=null) {
> - ka = (KeyAttachment) key.attachment();
> - socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
> - }
> - if (running && !paused) {
> - nioChannels.push(socket);
> - }
> - socket = null;
> - if (running && !paused && ka != null) {
> - keyCache.push(ka);
> - }
> - ka = null;
> + try {
> + if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
> + }catch ( IOException x ) {
> + handshake = -1;
> + if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
> + }catch ( CancelledKeyException ckx ) {
> + handshake = -1;
> + }
> + if ( handshake == 0 ) {
> + SocketState state = SocketState.OPEN;
> + // Process the request from this socket
> + // Suppress null warnings for key in this block since
> + // key can't be null in this block
> + if (status == null) {
> + state = handler.process(ka, SocketStatus.OPEN_READ);
> } else {
> - final SelectionKey fk = key;
> - final int intops = handshake;
> - final KeyAttachment ka = (KeyAttachment)fk.attachment();
> - ka.getPoller().add(socket,intops);
> - }
> - }catch(CancelledKeyException cx) {
> - socket.getPoller().cancelledKey(key,null);
> - } catch (OutOfMemoryError oom) {
> - try {
> - oomParachuteData = null;
> - log.error("", oom);
> - if (socket != null) {
> - socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
> - }
> - releaseCaches();
> - }catch ( Throwable oomt ) {
> + state = handler.process(ka, status);
> + }
> + if (state == SocketState.CLOSED) {
> + // Close socket and pool
> try {
> - System.err.println(oomParachuteMsg);
> - oomt.printStackTrace();
> - }catch (Throwable letsHopeWeDontGetHere){
> - ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
> - }
> + if (ka!=null) ka.setComet(false);
> + socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
> + if (running && !paused) {
> + nioChannels.push(socket);
> + }
> + socket = null;
> + if (running && !paused && ka != null) {
> + keyCache.push(ka);
> + }
> + ka = null;
> + }catch ( Exception x ) {
> + log.error("",x);
> + }
> + } else if (state == SocketState.LONG && ka != null && ka.isAsync() && ka.interestOps() > 0) {
> + //we are async, and we are interested in operations
> + ka.getPoller().add(socket, ka.interestOps());
> + }
> + } else if (handshake == -1 ) {
> + if (key != null) {
> + socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
> + }
> + if (running && !paused) {
> + nioChannels.push(socket);
> }
> - }catch ( Throwable t ) {
> - log.error("",t);
> + socket = null;
> + if (running && !paused && ka != null) {
> + keyCache.push(ka);
> + }
> + ka = null;
> + } else {
> + ka.getPoller().add(socket,handshake);
> + }
> + }catch(CancelledKeyException cx) {
> + socket.getPoller().cancelledKey(key,null);
> + } catch (OutOfMemoryError oom) {
> + try {
> + oomParachuteData = null;
> + log.error("", oom);
> if (socket != null) {
> socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
> }
> - } finally {
> - if (launch) {
> - try {
> - getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
> - } catch (NullPointerException npe) {
> - if (running) {
> - log.error(sm.getString("endpoint.launch.fail"),
> - npe);
> - }
> - }
> + releaseCaches();
> + }catch ( Throwable oomt ) {
> + try {
> + System.err.println(oomParachuteMsg);
> + oomt.printStackTrace();
> + }catch (Throwable letsHopeWeDontGetHere){
> + ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
> }
> - socket = null;
> - status = null;
> - //return to cache
> - if (running && !paused) {
> - processorCache.push(this);
> + }
> + }catch ( Throwable t ) {
> + log.error("",t);
> + if (socket != null) {
> + socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
> + }
> + } finally {
> + if (launch) {
> + try {
> + getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
> + } catch (NullPointerException npe) {
> + if (running) {
> + log.error(sm.getString("endpoint.launch.fail"),
> + npe);
> + }
> }
> }
> + socket = null;
> + status = null;
> + //return to cache
> + if (running && !paused) {
> + processorCache.push(this);
> + }
> }
> }
> }
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org