You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/02/21 17:37:05 UTC

svn commit: r510092 - in /tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net: NioBlockingSelector.java NioEndpoint.java NioSelectorPool.java

Author: fhanik
Date: Wed Feb 21 08:37:04 2007
New Revision: 510092

URL: http://svn.apache.org/viewvc?view=rev&rev=510092
Log:
Fixed latch behavior, still could be improved upon. In the next revision, I'll probably have the blocking read/write selector to use its own thread so that it doesn't content with the poller thread

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java?view=diff&rev=510092&r1=510091&r2=510092
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioBlockingSelector.java Wed Feb 21 08:37:04 2007
@@ -65,14 +65,21 @@
                 
                 KeyAttachment att = (KeyAttachment) key.attachment();
                 try {
-                    att.startLatch(1);
-                    socket.getPoller().add(socket,SelectionKey.OP_WRITE);
+                    if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1);
+                    if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE);
                     att.getLatch().await(writeTimeout,TimeUnit.MILLISECONDS);
-                    att.resetLatch();
                 }catch (InterruptedException ignore) {
+                    Thread.interrupted();
+                }
+                if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+                    //we got interrupted, but we haven't received notification from the poller.
+                    keycount = 0;
+                }else {
+                    //latch countdown has happened
+                    keycount = 1;
+                    att.resetLatch();
                 }
-                if ( att.getLatch() == null ) keycount = 1;
-                else keycount = 0;
+
                 if (writeTimeout > 0 && (keycount == 0))
                     timedout = (System.currentTimeMillis() - time) >= writeTimeout;
             } //while
@@ -122,14 +129,20 @@
                 }
                 KeyAttachment att = (KeyAttachment) key.attachment();
                 try {
-                    att.startLatch(1);
-                    socket.getPoller().add(socket,SelectionKey.OP_READ);
+                    if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1);
+                    if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ);
                     att.getLatch().await(readTimeout,TimeUnit.MILLISECONDS);
-                    att.resetLatch();
                 }catch (InterruptedException ignore) {
+                    Thread.interrupted();
+                }
+                if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {
+                    //we got interrupted, but we haven't received notification from the poller.
+                    keycount = 0;
+                }else {
+                    //latch countdown has happened
+                    keycount = 1;
+                    att.resetLatch();
                 }
-                if ( att.getLatch() == null ) keycount = 1;
-                else keycount = 0;
                 if (readTimeout > 0 && (keycount == 0))
                     timedout = (System.currentTimeMillis() - time) >= readTimeout;
             } //while

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=510092&r1=510091&r2=510092
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Feb 21 08:37:04 2007
@@ -593,7 +593,7 @@
 
         serverSock = ServerSocketChannel.open();
         InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
-        serverSock.socket().bind(addr,100); //todo, set backlog value
+        serverSock.socket().bind(addr,backlog); 
         serverSock.configureBlocking(true); //mimic APR behavior
 
         // Initialize thread count defaults for acceptor, poller and sendfile
@@ -852,6 +852,24 @@
 
 
     /**
+     * Returns true if a worker thread is available for processing.
+     * @return boolean
+     */
+    protected boolean isWorkerAvailable() {
+        if (workers.size() > 0) {
+            return true;
+        }
+        if ((maxThreads > 0) && (curThreads < maxThreads)) {
+            return true;
+        } else {
+            if (maxThreads < 0) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+    /**
      * Create (or allocate) and return an available processor for use in
      * processing a specific HTTP request, if possible.  If the maximum
      * allowed processors have already been created and are in use, return
@@ -1013,6 +1031,8 @@
                     // Accept the next incoming connection from the server socket
                     SocketChannel socket = serverSock.accept();
                     // Hand this socket off to an appropriate processor
+                    //TODO FIXME - this is currently a blocking call, meaning we will be blocking
+                    //further accepts until there is a thread available.
                     if ( running && (!paused) && socket != null ) processSocket(socket);
                 } catch (Throwable t) {
                     log.error(sm.getString("endpoint.accept.fail"), t);
@@ -1260,23 +1280,35 @@
                         if ( sk.isValid() && attachment != null ) {
                             attachment.access();
                             sk.attach(attachment);
+                            int interestOps = sk.interestOps();
                             sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
                             attachment.interestOps(0);
                             NioChannel channel = attachment.getChannel();
                             if (sk.isReadable() || sk.isWritable() ) {
                                 if ( attachment.getComet() ) {
-                                    if (!processSocket(channel, SocketStatus.OPEN))
-                                        processSocket(channel, SocketStatus.DISCONNECT);
+                                    //check if thread is available
+                                    if ( isWorkerAvailable() ) {
+                                        if (!processSocket(channel, SocketStatus.OPEN))
+                                            processSocket(channel, SocketStatus.DISCONNECT);
+                                    } else {
+                                        //reregister it
+                                        attachment.interestOps(interestOps);
+                                        sk.interestOps(interestOps);
+                                    }
                                 } else if ( attachment.getLatch() != null ) {
                                     attachment.getLatch().countDown();
                                 } else {
-                                    //this sucker here dead locks with the count down latch
-                                    //since this call is blocking if no threads are available.
-                                    //TODO: FIXME BIG TIME
-                                    boolean close = (!processSocket(channel));
-                                    if ( close ) {
-                                        channel.close();
-                                        channel.getIOChannel().socket().close();
+                                    //later on, improve latch behavior
+                                    if ( isWorkerAvailable() ) {
+                                        boolean close = (!processSocket(channel));
+                                        if (close) {
+                                            channel.close();
+                                            channel.getIOChannel().socket().close();
+                                        }
+                                    } else {
+                                        //reregister it
+                                        attachment.interestOps(interestOps);
+                                        sk.interestOps(interestOps);
                                     }
                                 }
                             } 

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=diff&rev=510092&r1=510091&r2=510092
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java Wed Feb 21 08:37:04 2007
@@ -41,23 +41,25 @@
     protected final static boolean SHARED =
         Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
     protected static Selector SHARED_SELECTOR;
+    
+    protected int maxSelectors = 200;
+    protected int maxSpareSelectors = -1;
+    protected boolean enabled = true;
+    protected AtomicInteger active = new AtomicInteger(0);
+    protected AtomicInteger spare = new AtomicInteger(0);
+    protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
+
     protected static Selector getSharedSelector() throws IOException {
         if (SHARED && SHARED_SELECTOR == null) {
             synchronized ( NioSelectorPool.class ) {
                 if ( SHARED_SELECTOR == null )  {
-					SHARED_SELECTOR = Selector.open();
+                    SHARED_SELECTOR = Selector.open();
                     log.info("Using a shared selector for servlet write/read");
-			    }
+                }
             }
         }
         return  SHARED_SELECTOR;
     }
-    protected int maxSelectors = 200;
-    protected int maxSpareSelectors = -1;
-    protected boolean enabled = true;
-    protected AtomicInteger active = new AtomicInteger(0);
-    protected AtomicInteger spare = new AtomicInteger(0);
-    protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
 
     public Selector get() throws IOException{
         if ( SHARED ) {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org