You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/02/21 17:37:05 UTC
svn commit: r510092 - in
/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net:
NioBlockingSelector.java NioEndpoint.java NioSelectorPool.java
Author: fhanik
Date: Wed Feb 21 08:37:04 2007
New Revision: 510092
URL: http://svn.apache.org/viewvc?view=rev&rev=510092
Log:
Fixed latch behavior, still could be improved upon. In the next revision, I'll probably have the blocking read/write selector to use its own thread so that it doesn't content with the poller thread
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=510092&r1=510091&r2=510092
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Wed Feb 21 08:37:04 2007
@@ -65,14 +65,21 @@
KeyAttachment att = (KeyAttachment) key.attachment();
try {
- att.startLatch(1);
- socket.getPoller().add(socket,SelectionKey.OP_WRITE);
+ if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1);
+ if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE);
att.getLatch().await(writeTimeout,TimeUnit.MILLISECONDS);
- att.resetLatch();
}catch (InterruptedException ignore) {
+ Thread.interrupted();
+ }
+ if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+ //we got interrupted, but we haven't received notification from the poller.
+ keycount = 0;
+ }else {
+ //latch countdown has happened
+ keycount = 1;
+ att.resetLatch();
}
- if ( att.getLatch() == null ) keycount = 1;
- else keycount = 0;
+
if (writeTimeout > 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >= writeTimeout;
} //while
@@ -122,14 +129,20 @@
}
KeyAttachment att = (KeyAttachment) key.attachment();
try {
- att.startLatch(1);
- socket.getPoller().add(socket,SelectionKey.OP_READ);
+ if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1);
+ if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ);
att.getLatch().await(readTimeout,TimeUnit.MILLISECONDS);
- att.resetLatch();
}catch (InterruptedException ignore) {
+ Thread.interrupted();
+ }
+ if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+ //we got interrupted, but we haven't received notification from the poller.
+ keycount = 0;
+ }else {
+ //latch countdown has happened
+ keycount = 1;
+ att.resetLatch();
}
- if ( att.getLatch() == null ) keycount = 1;
- else keycount = 0;
if (readTimeout > 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >= readTimeout;
} //while
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=510092&r1=510091&r2=510092
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Feb 21 08:37:04 2007
@@ -593,7 +593,7 @@
serverSock = ServerSocketChannel.open();
InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
- serverSock.socket().bind(addr,100); //todo, set backlog value
+ serverSock.socket().bind(addr,backlog);
serverSock.configureBlocking(true); //mimic APR behavior
// Initialize thread count defaults for acceptor, poller and sendfile
@@ -852,6 +852,24 @@
/**
+ * Returns true if a worker thread is available for processing.
+ * @return boolean
+ */
+ protected boolean isWorkerAvailable() {
+ if (workers.size() > 0) {
+ return true;
+ }
+ if ((maxThreads > 0) && (curThreads < maxThreads)) {
+ return true;
+ } else {
+ if (maxThreads < 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ /**
* Create (or allocate) and return an available processor for use in
* processing a specific HTTP request, if possible. If the maximum
* allowed processors have already been created and are in use, return
@@ -1013,6 +1031,8 @@
// Accept the next incoming connection from the server socket
SocketChannel socket = serverSock.accept();
// Hand this socket off to an appropriate processor
+ //TODO FIXME - this is currently a blocking call, meaning we will be blocking
+ //further accepts until there is a thread available.
if ( running && (!paused) && socket != null ) processSocket(socket);
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
@@ -1260,23 +1280,35 @@
if ( sk.isValid() && attachment != null ) {
attachment.access();
sk.attach(attachment);
+ int interestOps = sk.interestOps();
sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getComet() ) {
- if (!processSocket(channel, SocketStatus.OPEN))
- processSocket(channel, SocketStatus.DISCONNECT);
+ //check if thread is available
+ if ( isWorkerAvailable() ) {
+ if (!processSocket(channel, SocketStatus.OPEN))
+ processSocket(channel, SocketStatus.DISCONNECT);
+ } else {
+ //reregister it
+ attachment.interestOps(interestOps);
+ sk.interestOps(interestOps);
+ }
} else if ( attachment.getLatch() != null ) {
attachment.getLatch().countDown();
} else {
- //this sucker here dead locks with the count down latch
- //since this call is blocking if no threads are available.
- //TODO: FIXME BIG TIME
- boolean close = (!processSocket(channel));
- if ( close ) {
- channel.close();
- channel.getIOChannel().socket().close();
+ //later on, improve latch behavior
+ if ( isWorkerAvailable() ) {
+ boolean close = (!processSocket(channel));
+ if (close) {
+ channel.close();
+ channel.getIOChannel().socket().close();
+ }
+ } else {
+ //reregister it
+ attachment.interestOps(interestOps);
+ sk.interestOps(interestOps);
}
}
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=diff&rev=510092&r1=510091&r2=510092
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Wed Feb 21 08:37:04 2007
@@ -41,23 +41,25 @@
protected final static boolean SHARED =
Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
protected static Selector SHARED_SELECTOR;
+
+ protected int maxSelectors = 200;
+ protected int maxSpareSelectors = -1;
+ protected boolean enabled = true;
+ protected AtomicInteger active = new AtomicInteger(0);
+ protected AtomicInteger spare = new AtomicInteger(0);
+ protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
+
protected static Selector getSharedSelector() throws IOException {
if (SHARED && SHARED_SELECTOR == null) {
synchronized ( NioSelectorPool.class ) {
if ( SHARED_SELECTOR == null ) {
- SHARED_SELECTOR = Selector.open();
+ SHARED_SELECTOR = Selector.open();
log.info("Using a shared selector for servlet write/read");
- }
+ }
}
}
return SHARED_SELECTOR;
}
- protected int maxSelectors = 200;
- protected int maxSpareSelectors = -1;
- protected boolean enabled = true;
- protected AtomicInteger active = new AtomicInteger(0);
- protected AtomicInteger spare = new AtomicInteger(0);
- protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
public Selector get() throws IOException{
if ( SHARED ) {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org