You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/08/22 12:11:27 UTC
svn commit: r1516407 -
/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Author: markt
Date: Thu Aug 22 10:11:27 2013
New Revision: 1516407
URL: http://svn.apache.org/r1516407
Log:
Back-port NIO endpoint changes required to support concurrent read/write for JSR-356 upgraded connections
Modified:
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1516407&r1=1516406&r2=1516407&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Aug 22 10:11:27 2013
@@ -1260,7 +1260,7 @@ public class NioEndpoint extends Abstrac
processSocket(channel, SocketStatus.DISCONNECT, true);
} else {
//future placement of a WRITE notif
- if (!processSocket(channel, SocketStatus.OPEN_READ, true))
+ if (!processSocket(channel, SocketStatus.OPEN_WRITE, true))
processSocket(channel, SocketStatus.DISCONNECT, true);
}
} else {
@@ -1269,8 +1269,23 @@ public class NioEndpoint extends Abstrac
} else {
//later on, improve latch behavior
if ( isWorkerAvailable() ) {
- unreg(sk, attachment,sk.readyOps());
- boolean close = (!processSocket(channel, null, true));
+
+ boolean readAndWrite = sk.isReadable() && sk.isWritable();
+ reg(sk, attachment, 0);
+ if (attachment.isAsync() && readAndWrite) {
+ //remember the that we want to know about write too
+ attachment.interestOps(SelectionKey.OP_WRITE);
+ }
+ //read goes before write
+ if (sk.isReadable()) {
+ //read notification
+ if (!processSocket(channel, SocketStatus.OPEN_READ, true))
+ close = true;
+ } else {
+ //future placement of a WRITE notif
+ if (!processSocket(channel, SocketStatus.OPEN_WRITE, true))
+ close = true;
+ }
if (close) {
cancelledKey(sk,SocketStatus.DISCONNECT,false);
}
@@ -1319,7 +1334,9 @@ public class NioEndpoint extends Abstrac
cancelledKey(sk,SocketStatus.ERROR,false);
return false;
}
- sd.fchannel = new FileInputStream(f).getChannel();
+ @SuppressWarnings("resource") // Closed when channel is closed
+ FileInputStream fis = new FileInputStream(f);
+ sd.fchannel = fis.getChannel();
}
//configure output channel
@@ -1670,104 +1687,136 @@ public class NioEndpoint extends Abstrac
@Override
public void run() {
+ SelectionKey key = socket.getIOChannel().keyFor(
+ socket.getPoller().getSelector());
+ KeyAttachment ka = null;
+
+ if (key != null) {
+ ka = (KeyAttachment)key.attachment();
+ }
+
+ // Upgraded connections need to allow multiple threads to access the
+ // connection at the same time to enable blocking IO to be used when
+ // NIO has been configured
+ if (ka != null && ka.isUpgraded() &&
+ SocketStatus.OPEN_WRITE == status) {
+ synchronized (ka.getWriteThreadLock()) {
+ doRun(key, ka);
+ }
+ } else {
+ synchronized (socket) {
+ doRun(key, ka);
+ }
+ }
+ }
+
+ private void doRun(SelectionKey key, KeyAttachment ka) {
boolean launch = false;
- synchronized (socket) {
- SelectionKey key = null;
- try {
- key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- int handshake = -1;
+ try {
+ int handshake = -1;
- try {
- if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
- }catch ( IOException x ) {
- handshake = -1;
- if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
- }catch ( CancelledKeyException ckx ) {
- handshake = -1;
- }
- if ( handshake == 0 ) {
- SocketState state = SocketState.OPEN;
- // Process the request from this socket
- if (status == null) {
- state = handler.process(
- (KeyAttachment) key.attachment(),
- SocketStatus.OPEN_READ);
+ try {
+ if (key != null) {
+ // For STOP there is no point trying to handshake as the
+ // Poller has been stopped.
+ if (socket.isHandshakeComplete() ||
+ status == SocketStatus.STOP) {
+ handshake = 0;
} else {
- state = handler.process(
- (KeyAttachment) key.attachment(),
- status);
+ handshake = socket.handshake(
+ key.isReadable(), key.isWritable());
+ // The handshake process reads/writes from/to the
+ // socket. status may therefore be OPEN_WRITE once
+ // the handshake completes. However, the handshake
+ // happens when the socket is opened so the status
+ // must always be OPEN_READ after it completes. It
+ // is OK to always set this as it is only used if
+ // the handshake completes.
+ status = SocketStatus.OPEN_READ;
}
-
- if (state == SocketState.CLOSED) {
- // Close socket and pool
- try {
- KeyAttachment ka = null;
- if (key!=null) {
- ka = (KeyAttachment) key.attachment();
- if (ka!=null) ka.setComet(false);
- socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
- }
+ }
+ }catch ( IOException x ) {
+ handshake = -1;
+ if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
+ }catch ( CancelledKeyException ckx ) {
+ handshake = -1;
+ }
+ if ( handshake == 0 ) {
+ SocketState state = SocketState.OPEN;
+ // Process the request from this socket
+ if (status == null) {
+ state = handler.process(ka, SocketStatus.OPEN_READ);
+ } else {
+ state = handler.process(ka, status);
+ }
+ if (state == SocketState.CLOSED) {
+ // Close socket and pool
+ try {
+ if (ka!=null) ka.setComet(false);
+ socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
+ if (running && !paused) {
nioChannels.offer(socket);
- socket = null;
- if ( ka!=null ) keyCache.offer(ka);
- ka = null;
- }catch ( Exception x ) {
- log.error("",x);
}
+ socket = null;
+ if (running && !paused && ka!=null) {
+ keyCache.offer(ka);
+ }
+ ka = null;
+ } catch ( Exception x ) {
+ log.error("",x);
}
- } else if (handshake == -1 ) {
- KeyAttachment ka = null;
- if (key!=null) {
- ka = (KeyAttachment) key.attachment();
- socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
- }
- nioChannels.offer(socket);
- socket = null;
- if ( ka!=null ) keyCache.offer(ka);
- ka = null;
- } else {
- final SelectionKey fk = key;
- final int intops = handshake;
- final KeyAttachment ka = (KeyAttachment)fk.attachment();
- ka.getPoller().add(socket,intops);
- }
- }catch(CancelledKeyException cx) {
- socket.getPoller().cancelledKey(key,null,false);
- } catch (OutOfMemoryError oom) {
+ }
+ } else if (handshake == -1 ) {
+ if (key != null) {
+ socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
+ }
+ nioChannels.offer(socket);
+ socket = null;
+ if ( ka!=null ) keyCache.offer(ka);
+ ka = null;
+ } else {
+ ka.getPoller().add(socket, handshake);
+ }
+ }catch(CancelledKeyException cx) {
+ socket.getPoller().cancelledKey(key,null,false);
+ } catch (OutOfMemoryError oom) {
+ try {
+ oomParachuteData = null;
+ log.error("", oom);
+ if (socket != null) {
+ socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
+ }
+ releaseCaches();
+ }catch ( Throwable oomt ) {
try {
- oomParachuteData = null;
- log.error("", oom);
- if (socket != null) {
- socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
- }
- releaseCaches();
- }catch ( Throwable oomt ) {
- try {
- System.err.println(oomParachuteMsg);
- oomt.printStackTrace();
- }catch (Throwable letsHopeWeDontGetHere){
- ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
- }
+ System.err.println(oomParachuteMsg);
+ oomt.printStackTrace();
+ }catch (Throwable letsHopeWeDontGetHere){
+ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
- } catch (VirtualMachineError vme) {
- ExceptionUtils.handleThrowable(vme);
- }catch ( Throwable t ) {
- log.error("",t);
+ }
+ } catch (VirtualMachineError vme) {
+ ExceptionUtils.handleThrowable(vme);
+ }catch ( Throwable t ) {
+ log.error("",t);
+ if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
- } finally {
- if (launch) {
- try {
- getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
- } catch (NullPointerException npe) {
- if (running) {
- log.error(sm.getString("endpoint.launch.fail"),
- npe);
- }
+ }
+ } finally {
+ if (launch) {
+ try {
+ getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.launch.fail"),
+ npe);
}
}
- socket = null;
- status = null;
- //return to cache
+ }
+ socket = null;
+ status = null;
+ //return to cache
+ if (running && !paused) {
processorCache.offer(this);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org