You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2008/06/14 17:36:49 UTC

svn commit: r667817 - in /tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net: SelectorCallback.java SelectorThread.java SelectorThreadNio.java

Author: costin
Date: Sat Jun 14 08:36:49 2008
New Revision: 667817

URL: http://svn.apache.org/viewvc?rev=667817&view=rev
Log:
Selector abstraction, apr version not ready yet ( bugs - but seems a bit faster, so worth finishing ).

Modified:
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java

Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java?rev=667817&r1=667816&r2=667817&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java Sat Jun 14 08:36:49 2008
@@ -18,6 +18,8 @@
 import java.io.IOException;
 import java.nio.channels.Channel;
 
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
 /**
  * Notiy user code of events. All methods are called from the selector thread,
  * they should not block. The reason is to allow parsing and non-blocking 
@@ -30,16 +32,11 @@
  * ( older version used long - but non-blocking connect needs a second param )
  */
 public class SelectorCallback {
-  protected SelectorThread.SelectorData selectorData = new SelectorThread.SelectorData(this);
-  
-  public SelectorThread getSelector() {
-    return selectorData.sel;
-  }
   
   /** 
    * Called when the protocol is connected.
    */
-  public void connected(SelectorThread selThread) 
+  public void connected(SelectorData selThread) 
           throws IOException {
   }
 
@@ -47,27 +44,34 @@
    * It is possible to write data. 
    * For both read and write - re-enable interest if you want more data. 
    */
-  public void dataWriteable(SelectorThread selThread) throws IOException {
+  public void dataWriteable(SelectorData selThread) throws IOException {
   }
 
   /**
    * Data available for read.
    * For both read and write - re-enable interest if you want more data. 
    */
-  public void dataReceived(SelectorThread selThread) throws IOException {
+  public void dataReceived(SelectorData selThread) throws IOException {
   }
   
   /** 
    * nextTimeEvent reached. 
    */
-  public void timeEvent(SelectorThread selThread) {
+  public void timeEvent(SelectorData selThread) {
+  }
+
+  /**
+ * @throws IOException  
+   *  
+   */
+  public void ioThreadRun(SelectorData selThread) throws IOException {
   }
 
   /** 
    * Close was detected, or an unhandled exception happened while processing
    * this callback.
    */
-  public void channelClosed(SelectorThread selThread, Throwable ex) {
+  public void channelClosed(SelectorData selThread, Throwable ex) {
   }
   
   /**
@@ -79,7 +83,7 @@
    * TODO: is there any case where something else besides registering read
    * interest on the new connection is needed ? Maybe it could read some data ?
    */
-  public SelectorCallback connectionAccepted(SelectorThread selThread, 
+  public SelectorCallback connectionAccepted(SelectorData selThread, 
                                              Channel sockC) {
     return null;
   }

Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java?rev=667817&r1=667816&r2=667817&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java Sat Jun 14 08:36:49 2008
@@ -20,8 +20,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.Channel;
 
-import org.apache.tomcat.util.buf.ByteChunk;
-
 /**
  * Abstract NIO/APR to avoid some of the complexity and allow more code
  * sharing and experiments. 
@@ -44,17 +42,19 @@
   /** 
    * This is stored as the attachment in the selector.
    */
-  static class SelectorData {
-    SelectorData(SelectorCallback selectorCallback) {
-      this.callback = selectorCallback;
+  public static class SelectorData {
+    public SelectorData(SelectorThread sel) {
+      this.sel = sel;
     }
     
     // APR long is wrapped in a ByteChannel as well - with few other longs.
     Channel channelData;
-    
-    SelectorThread sel;
     Object selKey;
-    SelectorCallback callback;
+    
+    public SelectorThread sel;
+    public SelectorCallback callback;
+    
+    SelectorCallback pendingCallback;
     
     // Current interest, used internally to avoid waking up if no change
     // Also used for connect and accept.
@@ -72,14 +72,16 @@
     
     // Saved to allow debug messages for bad interest/looping
     int lastReadResult;
+    int zeroReads = 0;
     int lastWriteResult;
     
     public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append("SelData: ")
             .append(writeInterest ? "W/" : "")
-            .append(readInterest ? "R/" : "").append(selKey).append("/")
+            .append(readInterest ? "R/" : "").append("/")
             .append(channelData);
+        //append(selKey).
         return sb.toString();
     }
   }
@@ -116,16 +118,16 @@
    * @param sc
    * @param nextTimer time to call the timeEvent() callback
    */
-  public void setTimerEventTime(SelectorCallback sc, long nextTimer) {
-      sc.selectorData.nextTimeEvent = nextTimer;
+  public void setTimerEventTime(SelectorData selectorData, long nextTimer) {
+      selectorData.nextTimeEvent = nextTimer;
   }
   
-  public int readNonBlocking(SelectorCallback sc, ByteBuffer bb) 
+  public int readNonBlocking(SelectorData sc, ByteBuffer bb) 
       throws IOException {
     return 0;
   }
 
-  public int writeNonBlocking(SelectorCallback sc, ByteBuffer reqBuf) 
+  public int writeNonBlocking(SelectorData sc, ByteBuffer reqBuf) 
       throws IOException {
     return 0;
   }
@@ -133,7 +135,7 @@
   /** 
    * 
    */
-  public int close(SelectorCallback sc) throws IOException {
+  public int close(SelectorData sc) throws IOException {
     return 0;
   }
   
@@ -149,6 +151,9 @@
   { 
   }
 
+  public void ioThreadRun(SelectorData sdata) throws IOException {
+  }
+  
   /**
    * For use with daemon tools, inetd - the server socket will be created 
    * externally and passed as a file descriptor. This is needed to run on
@@ -160,13 +165,20 @@
   /** 
    * Change the callback associated with the socket.
    */
-  public void updateCallback(SelectorCallback old, SelectorCallback sc) {
+  public void updateCallback(SelectorData sdata, SelectorCallback old, SelectorCallback sc) {
   }
   
-  public void writeInterest(SelectorCallback sc, boolean writeInterest) {
+  public void writeInterest(SelectorData sc, boolean writeInterest) {
   }
 
-  public void readInterest(SelectorCallback sc, boolean readInterest) {
+  public void readInterest(SelectorData sc, boolean readInterest) throws IOException {
+  }
+  
+  public int getPort(SelectorData sd, boolean remote) {
+      return 0;
   }
   
+  public InetAddress getAddress(SelectorData sd, boolean remote) {
+      return null;
+  }
 }
\ No newline at end of file

Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java?rev=667817&r1=667816&r2=667817&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Sat Jun 14 08:36:49 2008
@@ -48,14 +48,18 @@
  */
 public class SelectorThreadNio extends SelectorThread implements Runnable {
 
-    static Logger log = Logger.getLogger("SelectorThreadNio");
+    static Logger log = Logger.getLogger("SelNio");
 
     Selector selector;
 
-    ArrayList<SelectorCallback> readInterest = new ArrayList<SelectorCallback>();
-    ArrayList<SelectorCallback> writeInterest = new ArrayList<SelectorCallback>();
-    ArrayList<SelectorCallback> connectAcceptInterest = 
-      new ArrayList<SelectorCallback>();
+    ArrayList<SelectorData> readInterest = new ArrayList<SelectorData>();
+    ArrayList<SelectorData> writeInterest = new ArrayList<SelectorData>();
+    ArrayList<SelectorData> connectAcceptInterest = 
+        new ArrayList<SelectorData>();
+    ArrayList<SelectorData> updateCallback = 
+        new ArrayList<SelectorData>();
+    ArrayList<SelectorData> runInterest = 
+        new ArrayList<SelectorData>();
 
     AtomicInteger opened = new AtomicInteger();
     AtomicInteger closed = new AtomicInteger();
@@ -63,7 +67,7 @@
 
     // actives are also stored in the Selector. This is only updated in the main 
     // thread
-    ArrayList<SelectorCallback> active = new ArrayList<SelectorCallback>();
+    ArrayList<SelectorData> active = new ArrayList<SelectorData>();
 
     boolean debug = false;
     boolean running = true;
@@ -74,7 +78,7 @@
 
     // Normally select will wait for the next time event - if it's 
     // too far in future, maxSleep will override it.
-    private long maxSleep = 1000;
+    private long maxSleep = 10000;
 
     // Never sleep less than minSleep. This defines the resulution for 
     // time events.
@@ -99,11 +103,6 @@
         }        
     }
 
-    public void setDaemon(boolean d) {
-        this.daemon = d;
-        
-    }
-
     public void setName(String n) {
         selectorThread.setName(n);
         Registry registry = Registry.getRegistry(null, null);
@@ -145,12 +144,17 @@
 
     public void stop() {
         running = false;
-        log.info("Selector thread stop " + this);
+        if (debug) {
+            log.info("Selector thread stop " + this);
+        }
         selector.wakeup();
     }
 
     public void run() {
-        log.info("Selector thread start " + this);
+        int sloops = 0;
+        if (debug) {
+            log.info("Start NIO thread, daemon=" + daemon);
+        }
         while (running) {
             // if we want timeouts - set here.
             try {
@@ -159,74 +163,135 @@
                 processPending();
 
                 long now = System.currentTimeMillis();
-                if (nextWakeup > 0 && nextWakeup < now) {
+                if (nextWakeup < now) {
                     // We don't want to iterate on every I/O
                     updateSleepTimeAndProcessTimeouts(now);
                 }
                 
                 int selected = selector.select(sleepTime);
                 lastWakeup = System.currentTimeMillis();
-
+                if (debug && selected == 0) {
+                    long delta = lastWakeup - now;
+                    if (delta < maxSleep - 1000) { // short wakeup
+                        log.info("Wakeup " + selected + " " + (delta));
+                    }
+                }
+                if (lastWakeup - now < 10 && selected == 0) {
+                    if (sloops > 50) {
+                        sloops = 0;
+                        log.severe("Looping !");
+                    }
+                    sloops++;
+                }
                 // handle events for existing req first.
                 if (selected != 0) {
+                    sloops = 0;
                     Set<SelectionKey> sel = selector.selectedKeys();
 
                     Iterator<SelectionKey> i = sel.iterator();
                     while (i.hasNext()) {
                         SelectionKey sk = i.next();
+                        i.remove();
                         // avoid dup - disable the interest
-                        // TODO: is this really needed ? 
-                        int readyOps = sk.readyOps();
-                        sk.interestOps(sk.interestOps() & ~readyOps);
+                        // TODO: is this really needed ?
+                        boolean valid = sk.isValid();
+                        int readyOps = (valid) ? sk.readyOps() : 0;
+                        if (debug) {
+                            log.info("Wakeup " + selected + " " + (lastWakeup - now) + 
+                                    " ready: " + readyOps + " " + 
+                                    sk.isValid() + " " + sk);
+                        }
+                        
+                        //sk.interestOps(sk.interestOps() & ~readyOps);
                         // Find the request receiving the notification
+                        
                         SelectorData sdata = (SelectorData) sk.attachment();
+                        //synchronized (sdata) {
+                        // either that or updateCallback in IO/Thread
                         SelectorCallback cstate = sdata.callback;
-                        if (debug) {
-                            log.info("SelectorThread: selected " + cstate + " " + readyOps);
+                            
+                        //}
+                        //checkChannelKey(cstate);
+                        if (sdata.selKey != sk || sdata.channelData != sk.channel()) {
+                        sdata.selKey = sk;
+                        sdata.channelData = sk.channel();
                         }
 
                         if (sk.isValid() && sk.isAcceptable()) {
                             handleAccept(cstate, sk);
+                            if (debug) {
+                                log.info("Wakeup done, accept" + selected 
+                                        + " " + (lastWakeup - now)   
+                                        + " ready: " + readyOps + " " 
+                                        + sk.readyOps() + " " + sk);
+                            }
                             continue;
                         }
 
                         SocketChannel sc = (SocketChannel) sk.channel();
                         if (!sk.isValid()) {
                             if (debug) {
-                                log.info("SelectorThread: !isValid, closed socket " + cstate);
+                                log.info("!isValid, closed socket " + cstate);
                             }
-                            close(sk, cstate, sc, null);
+                            close(sdata, sk, cstate, sc, null, true);
                             continue;
                         }
 
                         try {
                             // callbacks
                             if (sk.isValid() && sk.isConnectable()) {
-                                handleConnect(cstate, sc);
+                                // Only needed once
+                                sk.interestOps(sk.interestOps() 
+                                        & ~SelectionKey.OP_CONNECT);
+                                handleConnect(sdata, cstate, sc);
+                                if (debug) {
+                                    log.info("Wakeup done, connect" + selected 
+                                            + " " + (lastWakeup - now)   
+                                            + " ready: " + readyOps + " " 
+                                            + sk.readyOps() + " " + sk);
+                                }
                             }
 
                             if (sk.isValid() && sk.isWritable()) {
-                                cstate.selectorData.lastWriteResult = 0;
-                                cstate.dataWriteable(this);
-                                if (cstate.selectorData.lastWriteResult > 0 && 
-                                        cstate.selectorData.writeInterest) {
+                                // Needs to be explicitely re-enabled
+                                sk.interestOps(sk.interestOps() 
+                                        & ~SelectionKey.OP_WRITE);
+                                sdata.lastWriteResult = 0;
+                                if (debug) {
+                                    log.info("dataWritable " + selected 
+                                            + " " + (lastWakeup - now)   
+                                            + " ready: " + readyOps + " " 
+                                            + sk.readyOps() + " " + cstate + 
+                                            " " + sk);
+                                }
+                                cstate.dataWriteable(sdata);
+                                
+                                if (sdata.lastWriteResult > 0 && 
+                                        sdata.writeInterest) {
                                     log.warning("SelectorThread: write interest" +
                                                 " after incomplete write");
                                 }
                             }
 
                             if (sk.isReadable()) {
-                                cstate.selectorData.lastReadResult = 0;
-                                cstate.dataReceived(this);
-                                if (cstate.selectorData.lastReadResult > 0 && 
-                                        cstate.selectorData.readInterest) {
-                                    log.warning("SelectorThread: read interest" +
-                                                " after incomplete read");
+                                sdata.lastReadResult = 0;
+                                if (debug) {
+                                    log.info("dataReceived " + selected 
+                                            + " " + (lastWakeup - now)   
+                                            + " ready: " + readyOps + " " 
+                                            + sk.readyOps() + " " + cstate + 
+                                            " " + sk);
                                 }
+                                cstate.dataReceived(sdata);
+//                                if (cstate.selectorData.lastReadResult > 0 && 
+//                                        cstate.selectorData.readInterest) {
+//                                    log.warning("SelectorThread: read interest" +
+//                                                " after incomplete read");
+//                                }
                             }
                         } catch (Throwable t) {
                             t.printStackTrace();
-                            close(sk, cstate, sc, t);
+                            close(sdata, sk, cstate, sc, t, true);
                         }
 
                     }
@@ -240,12 +305,18 @@
         } // while(running)
     }
 
-    private void handleConnect(SelectorCallback cstate, SocketChannel sc)
+    private void handleConnect(SelectorData sdata, SelectorCallback cstate, SocketChannel sc)
             throws IOException, SocketException {
-        sc.finishConnect();
+        if (!sc.finishConnect()) {
+            log.warning("handleConnected - finishConnect returns false");
+        }
+        sdata.sel = this;
         sc.socket().setSoLinger(true, 0);
-        cstate.connected(this);
-        readInterest(cstate, true);
+        if (debug) {
+            log.info("connected() " + cstate);
+        }
+        cstate.connected(sdata);
+        readInterest(sdata, true);
     }
 
     private void handleAccept(SelectorCallback cstate, SelectionKey sk)
@@ -253,103 +324,94 @@
         SelectableChannel selc = sk.channel();
         ServerSocketChannel ssc=(ServerSocketChannel)selc;
         SocketChannel sockC = ssc.accept();
-//                            if (sockC == null) {
-//                                continue;
-//                            }
         sockC.configureBlocking(false);
-        SelectorCallback acb = cstate.connectionAccepted(this, sockC);
-        acb.selectorData.selKey = sockC.register(selector, 
-            SelectionKey.OP_READ, 
-            acb.selectorData);
-        acb.selectorData.channelData = sockC;
-        acb.selectorData.sel = this;
-        active.add(acb);
+        SelectorData selectorData = new SelectorData(this);
+        selectorData.selKey = sockC.register(selector, 
+                SelectionKey.OP_READ, 
+                selectorData);
+        selectorData.channelData = sockC;
         
-        sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
-    }
-
-    public void updateCallback(SelectorCallback old, 
-                               SelectorCallback cstate) {
-        cstate.selectorData = old.selectorData;
-        cstate.selectorData.callback = cstate;
-        // leave old.selectorData around in case some thread is still using it
-    }
-
-    public void writeInterest(SelectorCallback cstate, boolean b) {
-        if (b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) != 0) {
+        // Find the callback for the new socket
+        SelectorCallback acb = 
+            cstate.connectionAccepted(selectorData, sockC);
+        selectorData.callback = acb;
+        if (acb == null) {
+            log.severe("No callback for socket " + selectorData);
+            close(selectorData);
             return;
         }
-        if (!b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) == 0) {
-            return;
+        
+        synchronized (active) {
+            active.add(selectorData);
         }
-        if (Thread.currentThread() == selectorThread) {
-            cstate.selectorData.writeInterest = b;
-            if (cstate.selectorData.writeInterest) {
-                cstate.selectorData.interest = 
-                    cstate.selectorData.interest | SelectionKey.OP_WRITE;
-            } else {
-                cstate.selectorData.interest = 
-                    cstate.selectorData.interest & ~SelectionKey.OP_WRITE;                
-            }
-            SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;            
-            sk.interestOps(cstate.selectorData.interest);
-            return;
+        //sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
+        if (debug) {
+            log.info("handleAccept " + cstate);
         }
-        if (!b) {
-            return; // can't remove interest from regular thread
-        }
-        synchronized (writeInterest) {
-            writeInterest.add(cstate);
+    }
+
+    /**
+     * Tricky: we have a socket ( current.selectorData ), we want
+     * it to be associated with a different selectorData.callback, but
+     * what if the IO thread is just making a callback on that 
+     * selectorData ? 
+     * 
+     * We could do it in the IO/thread, or make sure it is atomic.
+     *  
+     */
+    @Override
+    public void updateCallback(SelectorData sdata, 
+                               SelectorCallback current, 
+                               SelectorCallback cstate) {
+        sdata.pendingCallback = cstate;
+        if (isSelectorThread()) {
+            updateCallbackIOT(sdata);
+        } else {
+            synchronized (updateCallback) {
+                updateCallback.add(sdata);
+            }
         }
-        selector.wakeup();
     }
     
-    public void readInterest(SelectorCallback cstate, boolean b) {
-        if (Thread.currentThread() == selectorThread) {
-            cstate.selectorData.readInterest = b; 
-            selThreadUpdateInterest(cstate);
-            log.info("Registering read interest");
-            return;
-        }
-        if (b && (cstate.selectorData.interest | SelectionKey.OP_READ) != 0) {
-            return;
+    private void processPendingUpdateCallback() {
+        if (updateCallback.size() > 0) {
+            synchronized (updateCallback) {
+                Iterator<SelectorData> ci = updateCallback.iterator();
+                while (ci.hasNext()) {
+                    SelectorData cstate = ci.next();
+                    updateCallbackIOT(cstate);
+                }
+                updateCallback.clear();
+            }
         }
-        if (!b && (cstate.selectorData.interest | SelectionKey.OP_READ) == 0) {
-            return;
+    }
+    
+    private void updateCallbackIOT(SelectorData sdata) {
+        if (debug) {
+            log.info("Callback update " + sdata.pendingCallback + " old=" + sdata.callback);
         }
-        // Schedule the interest update.
-        synchronized (readInterest) {
-            readInterest.add(cstate);
+        synchronized (sdata) {
+            sdata.callback = sdata.pendingCallback; 
+            sdata.pendingCallback = null;
         }
-        log.info("Registering pending read interest");
-        selector.wakeup();
     }
 
 
-    private void selThreadUpdateInterest(SelectorCallback cstate) {
-        SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
-        if (sk != null && sk.isValid()) {
-            if (debug) {
-                log.info("SelectorThread: process pending: interest " + 
-                        cstate.selectorData.interest + " for " + cstate);
-            }
-            if (cstate.selectorData.readInterest) {
-                cstate.selectorData.interest = 
-                    cstate.selectorData.interest | SelectionKey.OP_READ;
-            } else {
-                cstate.selectorData.interest = 
-                    cstate.selectorData.interest & ~SelectionKey.OP_READ;                
-            }
-            sk.interestOps(cstate.selectorData.interest);
-        }
-    }
-    
-    private void close(SelectionKey sk,
+    private void close(SelectorData sdata, 
+                       SelectionKey sk,
                        SelectorCallback cstate,
                        Channel ch, 
-                       Throwable ex) {
+                       Throwable ex, boolean remove) {
         try {
-            sk.cancel();
+            if (debug) {
+                log.info("-------------> close: " + cstate + " t=" + ex);
+            }
+            if (sk != null && sk.isValid()) {
+                sk.interestOps(0);
+            }
+            if (sk != null) { 
+                sk.cancel();
+            }
             if (ch instanceof SocketChannel) {
                 SocketChannel sc = (SocketChannel) ch;
                 if (sc.isConnected()) {
@@ -368,8 +430,12 @@
             }
             ch.close();
             closed.incrementAndGet();
-            cstate.channelClosed(this, ex);
-            active.remove(cstate);
+            cstate.channelClosed(sdata, ex);
+            if (remove) {
+                synchronized (active) {
+                    active.remove(cstate);
+                }
+            }
         } catch (IOException ex2) {
             log.severe("SelectorThread: Error closing socket " + ex2);
             ex2.printStackTrace();
@@ -379,39 +445,117 @@
     // --------------- Socket op abstractions ------------
 
     @Override
-    public int readNonBlocking(SelectorCallback cstate, ByteBuffer bb) 
+    public int readNonBlocking(SelectorData selectorData, ByteBuffer bb) 
     throws IOException {
-        int done = ((SocketChannel) cstate.selectorData.channelData).read(bb);
-        cstate.selectorData.lastReadResult = done;
-        if (done < 0) {
+        try {
+            int off = bb.position();
+            int done = ((SocketChannel) selectorData.channelData).read(bb);
             if (debug) {
-                log.info("SelectorThread: EOF while reading");
+                log.info("-------------readNB rd=" + done + " bb.limit=" + 
+                        bb.limit() + " pos=" + bb.position() + " " + selectorData.callback);
             }
+            if (done > 0) {
+                if (debug) {
+                    String s = new String(bb.array(), off,
+                            bb.position() - off);
+                    log.info("Data:\n" + s);
+                }
+                if (done + off != bb.position()) {
+                    System.err.println("XXX");
+                }
+                selectorData.zeroReads = 0;
+            } else if (done < 0) {
+                if (debug) {
+                    log.info("SelectorThread: EOF while reading");
+                }
+            } else {
+                // need more...
+                if (selectorData.lastReadResult == 0) {
+                    selectorData.zeroReads++;
+                    if (selectorData.zeroReads > 6) {
+                        log.severe("Double 0 reading ");
+                        close(selectorData);
+                        return -1;
+                    }
+                }
+            }
+            selectorData.lastReadResult = done;
+            return done;
+        } catch(IOException ex) {
+            if (debug) {
+                log.info("readNB error rd=" + -1 + " bblen=" + 
+                        (bb.limit() - bb.position()) + " " + selectorData.callback + " " + ex);
+            }
+            close(selectorData);
+            return -1;
         }
-        return done;
     }
 
     @Override
-    public int writeNonBlocking(SelectorCallback cstate, ByteBuffer bb) 
-        throws IOException {
-        int done = ((SocketChannel) cstate.selectorData.channelData).write(bb);
-        cstate.selectorData.lastWriteResult = done;
-        return done;
+    public int writeNonBlocking(SelectorData selectorData, ByteBuffer bb) 
+            throws IOException {
+        try {
+            if (debug) {
+                log.info("writeNB pos=" + bb.position() + " len=" + 
+                        (bb.limit() - bb.position()) + " " + selectorData.callback);
+            }
+            if (debug) {
+                String s = new String(bb.array(), bb.position(),
+                        bb.limit() - bb.position());
+                log.info("Data:\n" + s);
+            }
+            int done = ((SocketChannel) selectorData.channelData).write(bb);
+            selectorData.lastWriteResult = done;
+            return done;
+        } catch(IOException ex) {
+            if (debug) {
+                log.info("writeNB error pos=" + bb.position() + " len=" + 
+                        (bb.limit() - bb.position()) + " " + selectorData.callback + " " + 
+                        ex);
+            }
+            close(selectorData);
+            return -1;
+        }
+    }
+
+    public int getPort(SelectorData sd, boolean remote) {
+        SocketChannel socketChannel = (SocketChannel) sd.channelData;        
+        
+        if (remote) {
+            return socketChannel.socket().getPort();
+        } else {
+            return socketChannel.socket().getLocalPort();
+        }
+    }
+    
+    public InetAddress getAddress(SelectorData sd, boolean remote) {
+        SocketChannel socketChannel = (SocketChannel) sd.channelData;        
+        
+        if (remote) {
+            return socketChannel.socket().getInetAddress();
+        } else {
+            return socketChannel.socket().getLocalAddress();
+        }
     }
 
     /** 
      */
     @Override
     public void connect(String host, int port, SelectorCallback cstate) 
-    throws IOException {
+            throws IOException {
         SocketChannel socketChannel = SocketChannel.open();
         socketChannel.configureBlocking(false);
-        int o = opened.incrementAndGet();
+        SelectorData selectorData = new SelectorData(this);
+        selectorData.sel = this;
+        selectorData.callback = cstate;
+        selectorData.channelData = socketChannel;
+        selectorData.channelData = socketChannel; // no key
+        
         socketChannel.connect(new InetSocketAddress(host,  port));
-        cstate.selectorData.channelData = socketChannel;
-        cstate.selectorData.interest = SelectionKey.OP_CONNECT;
+        opened.incrementAndGet();
+        
         synchronized (connectAcceptInterest) {
-            connectAcceptInterest.add(cstate);
+            connectAcceptInterest.add(selectorData);
         }
         selector.wakeup();
     }
@@ -424,14 +568,14 @@
     }
 
     // TODO
-    public void setSocketOptions(SelectorCallback cstate,
+    public void setSocketOptions(SelectorData selectorData,
                                  int linger, 
                                  boolean tcpNoDelay,
                                  int socketTimeout)
     throws IOException {
 
         SocketChannel socketChannel = 
-            (SocketChannel) cstate.selectorData.channelData;
+            (SocketChannel) selectorData.channelData;
         Socket socket = socketChannel.socket();
 
         if(linger >= 0 ) 
@@ -443,14 +587,16 @@
     }
 
     @Override
-    public int close(SelectorCallback cstate) throws IOException {
-        close((SelectionKey) cstate.selectorData.selKey, cstate, 
-              cstate.selectorData.channelData, null);
+    public int close(SelectorData selectorData) throws IOException {
+        close(selectorData,
+                (SelectionKey) selectorData.selKey, selectorData.callback, 
+                selectorData.channelData, null, true);
         return 0;
     }
 
 
 
+    @Override
     public void acceptor(SelectorCallback cstate, 
                          int port, 
                          InetAddress inet, 
@@ -475,28 +621,43 @@
 
         ssc.configureBlocking(false);
 
-        cstate.selectorData.channelData = ssc;
-        cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
-        // cstate must return 'OP_ACCEPT' as interest
+        SelectorData selectorData = new SelectorData(this);
+        selectorData.channelData = ssc; // no key yet
+        selectorData.callback = cstate; 
+        // key will be set in pending
+        
+        
         synchronized (connectAcceptInterest) {
-            connectAcceptInterest.add(cstate);
+            connectAcceptInterest.add(selectorData);
         }
         selector.wakeup();
     }
+    
+    public void ioThreadRun(SelectorData sdata) throws IOException {
+        if (isSelectorThread()) {
+            sdata.callback.ioThreadRun(sdata);
+        } else {
+            synchronized (runInterest) {
+                runInterest.add(sdata);
+            }
+            selector.wakeup();
+        }
+    }
 
+    @Override
     public void inetdAcceptor(SelectorCallback cstate) throws IOException {
         SelectorProvider sp=SelectorProvider.provider();
 
         Channel ch=sp.inheritedChannel();
         if(ch!=null ) {
-            //("Inherited: " + ch.getClass().getName());
+            log.info("Inherited: " + ch.getClass().getName());
             // blocking mode
             ServerSocketChannel ssc=(ServerSocketChannel)ch;
-            cstate.selectorData.channelData = ssc;
-            cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
-            // cstate must return 'OP_ACCEPT' as interest
+            SelectorData selectorData = new SelectorData(this);
+            selectorData.channelData = ssc;
+            
             synchronized (connectAcceptInterest) {
-                connectAcceptInterest.add(cstate);
+                connectAcceptInterest.add(selectorData);
             }
             selector.wakeup();
         }
@@ -508,31 +669,45 @@
      *  smallest timeout
      * @throws IOException 
      */
-    void updateSleepTimeAndProcessTimeouts(long now) throws IOException {
+    synchronized void updateSleepTimeAndProcessTimeouts(long now) 
+            throws IOException {
         long min = Long.MAX_VALUE;
         // TODO: test with large sets, maybe sort
-        Iterator<SelectorCallback> activeIt = active.iterator();
-        while(activeIt.hasNext()) {
-          SelectorCallback cstate = activeIt.next();
-          if (! cstate.selectorData.channelData.isOpen()) {
-            log.info("Closed socket " + cstate.selectorData.channelData);
-            activeIt.remove();
-            close(cstate); // generate callback, increment counters.
-          }
-          
-          long t = cstate.selectorData.nextTimeEvent;
-          if (t == 0) {
-            continue;
-          }
-          if (t < now) {
-            // Timeout
-            cstate.timeEvent(this);
-            // TODO: make sure this is updated if it was selected
-            continue;
-          }
-          if (t < min) {
-            min = t;
-          }
+        synchronized (active) {
+            Iterator<SelectorData> activeIt = active.iterator();
+
+            while(activeIt.hasNext()) {
+                SelectorData selectorData = activeIt.next();
+                if (! selectorData.channelData.isOpen()) {
+                    if (debug) {
+                        log.info("Found closed socket, removing " + 
+                                selectorData.channelData);
+                    }
+                    activeIt.remove();
+                    close(selectorData, 
+                            (SelectionKey) selectorData.selKey, selectorData.callback, 
+                            selectorData.channelData, null, false); // generate callback, increment counters.
+                }
+
+                long t = selectorData.nextTimeEvent;
+                if (t == 0) {
+                    continue;
+                }
+                if (t < now) {
+                    // Timeout
+                    if (debug) {
+                        log.info("Time event " + selectorData.callback);
+                    }
+                    if (selectorData.callback != null) {
+                        selectorData.callback.timeEvent(selectorData);
+                    }
+                    // TODO: make sure this is updated if it was selected
+                    continue;
+                }
+                if (t < min) {
+                    min = t;
+                }
+            }
         }
         long nextSleep = min - now;
         if (nextSleep > maxSleep) {
@@ -542,38 +717,179 @@
         } else {
             sleepTime = nextSleep;
         }
+        if (sleepTime == 0) {
+            System.err.println("XXX");
+        }
         nextWakeup = now + sleepTime;
     }
 
-    private void processPending() throws IOException {
+    @Override    
+    public void writeInterest(SelectorData selectorData, boolean b) {
+        SelectionKey sk = (SelectionKey) selectorData.selKey;
+        if (!sk.isValid()) {
+            return;
+        }
+        int interest = sk.interestOps();
+        if (b && (interest & SelectionKey.OP_WRITE) != 0) {
+            return;
+        }
+        if (!b && (interest & SelectionKey.OP_WRITE) == 0) {
+            return;
+        }
+        if (Thread.currentThread() == selectorThread) {
+            selectorData.writeInterest = b;
+            if (selectorData.writeInterest) {
+                interest = 
+                    interest | SelectionKey.OP_WRITE;
+            } else {
+                interest = 
+                    interest & ~SelectionKey.OP_WRITE;                
+            }
+            if (interest == 0) {
+                log.warning("No interest " + selectorData.callback);
+            } else {
+                sk.interestOps(interest);                
+            }
+            if (debug) {
+                log.info("Write interest " + selectorData.callback + " i=" + interest);
+            }
+            return;
+        }
+        if (!b) {
+            return; // can't remove interest from regular thread
+        }
+        selectorData.writeInterest = b;
+        if (debug) {
+            log.info("Pending write interest " + selectorData.callback);
+        }
+        synchronized (writeInterest) {
+            writeInterest.add(selectorData);
+        }
+        selector.wakeup();
+    }
+    
+    
+    @Override
+    public void readInterest(SelectorData selectorData, boolean b) throws IOException {
+        if (Thread.currentThread() == selectorThread) {
+            selectorData.readInterest = b; 
+            selThreadReadInterest(selectorData);
+            return;
+        }
+        SelectionKey sk = (SelectionKey) selectorData.selKey;
+        int interest = sk.interestOps();
+        selectorData.readInterest = b;
+        if (b && (interest & SelectionKey.OP_READ) != 0) {
+            return;
+        }
+        if (!b && (interest & SelectionKey.OP_READ) == 0) {
+            return;
+        }
+        // Schedule the interest update.
+        synchronized (readInterest) {
+            readInterest.add(selectorData);
+        }
+        if (debug) {
+            log.info("Registering pending read interest");
+        }
+        selector.wakeup();
+    }
+
+
+    private void selThreadReadInterest(SelectorData selectorData) throws IOException {
+        SelectionKey sk = (SelectionKey) selectorData.selKey;
+        if (sk == null) {
+            if (selectorData.readInterest) {
+                if (debug) {
+                    log.info("Register again for read interest");
+                }
+                SocketChannel socketChannel = 
+                    (SocketChannel) selectorData.channelData;
+                if (socketChannel.isOpen()) {
+                    selectorData.sel = this;
+                    selectorData.selKey = 
+                        socketChannel.register(selector, 
+                                SelectionKey.OP_READ, selectorData);
+                    selectorData.channelData = socketChannel;
+                }
+            }
+            return;
+        }
+        if (!sk.isValid()) {
+            return;
+        }
+        int interest = sk.interestOps();
+        if (sk != null && sk.isValid()) {
+            if (selectorData.readInterest) {
+//                if ((interest | SelectionKey.OP_READ) != 0) {
+//                    return;
+//                }
+                interest = 
+                    interest | SelectionKey.OP_READ;
+            } else {
+//                if ((interest | SelectionKey.OP_READ) == 0) {
+//                    return;
+//                }
+                interest = 
+                    interest & ~SelectionKey.OP_READ;                
+            }
+            if (interest == 0) {
+                log.warning("No interest(rd removed) " + selectorData.callback);
+                // TODO: should we remove it ? It needs to be re-activated
+                // later.
+                sk.cancel(); //??
+                selectorData.selKey = null;
+            } else {
+                sk.interestOps(interest);
+            }
+            if (debug) {
+                log.info(((selectorData.readInterest) 
+                        ? "RESUME read " : "SUSPEND read ") 
+                        + selectorData.callback);
+            }
+        }
+    }
+    
+
+    private void processPendingConnectAccept() throws IOException {
         synchronized (connectAcceptInterest) {
-            Iterator<SelectorCallback> ci = connectAcceptInterest.iterator();
+            Iterator<SelectorData> ci = connectAcceptInterest.iterator();
 
             while (ci.hasNext()) {
-                SelectorCallback cstate = ci.next();
-                SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+                SelectorData selectorData = ci.next();
+                SelectionKey sk = (SelectionKey) selectorData.selKey;
                 
                 // Find host, port - initiate connection
                 try {
                     // Accept interest ?
-                    if (cstate.selectorData.channelData instanceof ServerSocketChannel) {
+                    if (selectorData.channelData instanceof ServerSocketChannel) {
                         ServerSocketChannel socketChannel = 
-                            (ServerSocketChannel) cstate.selectorData.channelData;
-                        cstate.selectorData.sel = this;
-                        cstate.selectorData.selKey = 
+                            (ServerSocketChannel) selectorData.channelData;
+                        selectorData.sel = this;
+                        selectorData.selKey = 
                           socketChannel.register(selector, 
-                              SelectionKey.OP_ACCEPT, cstate.selectorData);
+                              SelectionKey.OP_ACCEPT, selectorData);
                         
-                        cstate.selectorData.channelData = socketChannel;
-                        active.add(cstate);
+                        selectorData.channelData = socketChannel;
+                        synchronized (active) {
+                            active.add(selectorData);
+                        }
+                        if (debug) {
+                            log.info("Pending acceptor added: " + selectorData.callback);
+                        }
                     } else {
                         SocketChannel socketChannel =
-                          (SocketChannel) cstate.selectorData.channelData;
-                        cstate.selectorData.sel = this;
-                        cstate.selectorData.selKey = 
+                          (SocketChannel) selectorData.channelData;
+                        selectorData.sel = this;
+                        selectorData.selKey = 
                           socketChannel.register(selector, 
-                              SelectionKey.OP_CONNECT, cstate.selectorData);              
-                        active.add(cstate);
+                              SelectionKey.OP_CONNECT, selectorData);
+                        synchronized (active) {
+                            active.add(selectorData);
+                        }
+                        if (debug) {
+                            log.info("Pending connect added: " + selectorData.callback);
+                        }
                     }
                 } catch (IOException e) {
                     e.printStackTrace();
@@ -581,27 +897,59 @@
             }
             connectAcceptInterest.clear();
         }
+    }
+    
+    private void processPending() throws IOException {
+        processPendingConnectAccept();
+        processPendingReadWrite();
         
+        if (runInterest.size() > 0) {
+            synchronized (runInterest) {
+                Iterator<SelectorData> ci = runInterest.iterator();
+                while (ci.hasNext()) {
+                    SelectorData cstate = ci.next();
+                    try {
+                        cstate.callback.ioThreadRun(cstate);
+                    } catch (Throwable t) {
+                        t.printStackTrace();
+                    }
+                    if (debug) {
+                        log.info("Run in selthread: " + cstate.callback);
+                    }
+                }
+                runInterest.clear();
+            }
+        }
+        processPendingUpdateCallback();
+    }
+
+    private void processPendingReadWrite() throws IOException {
         // Update interest 
         if (readInterest.size() > 0) {
             synchronized (readInterest) {
-                Iterator<SelectorCallback> ci = readInterest.iterator();
+                Iterator<SelectorData> ci = readInterest.iterator();
                 while (ci.hasNext()) {
-                    SelectorCallback cstate = ci.next();
-                    selThreadUpdateInterest(cstate);
+                    SelectorData cstate = ci.next();
+                    selThreadReadInterest(cstate);
+                    if (debug) {
+                        log.info("Read interest added: " + cstate);
+                    }
                 }
                 readInterest.clear();
             }
         }
         if (writeInterest.size() > 0) {
             synchronized (writeInterest) {
-                Iterator<SelectorCallback> ci = writeInterest.iterator();
+                Iterator<SelectorData> ci = writeInterest.iterator();
                 while (ci.hasNext()) {
-                    SelectorCallback cstate = ci.next();
+                    SelectorData cstate = ci.next();
                     // Fake callback - will update as side effect
-                    cstate.dataWriteable(this);
+                    cstate.callback.dataWriteable(cstate);
+                    if (debug) {
+                        log.info("Write interest, calling dataWritable: " + cstate);
+                    }
                 }
-                readInterest.clear();
+                writeInterest.clear();
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org