You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2008/04/23 19:30:00 UTC

svn commit: r650945 - in /tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net: SelectorCallback.java SelectorThread.java SelectorThreadNio.java

Author: costin
Date: Wed Apr 23 10:29:58 2008
New Revision: 650945

URL: http://svn.apache.org/viewvc?rev=650945&view=rev
Log:
Ok, finally - the first part of the new connector, or the last part of tomcat-lite experiment :-)
It is obviously quite independent of the rest of tomcat-lite, probably the last to move out of sandbox.

I have an Apr impl as well, but it's broken now, need to fix it again. 

This is the IO abstraction - the connector goal is to do all I/O in non-blocking mode and allow 
completely non-blocking adapters ( i.e. a bit more than regular coyote ).


Added:
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java   (with props)

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java?rev=650945&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java Wed Apr 23 10:29:58 2008
@@ -0,0 +1,87 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+
+/**
+ * Notiy user code of events. All methods are called from the selector thread,
+ * they should not block. The reason is to allow parsing and non-blocking 
+ * processing to be done as fast as possible, and avoid the need for 
+ * SelectorThread to deal with a thread pool.
+ * 
+ * This class wraps the Channel. For APR we wrap the socket and few other
+ * fields we need for non-blocking operation in a ByteChannel, the code
+ * seems cleaner and it's nice to be able to use APR more portably.
+ * ( older version used long - but non-blocking connect needs a second param )
+ */
+public class SelectorCallback {
+  protected SelectorThread.SelectorData selectorData = new SelectorThread.SelectorData(this);
+  
+  public SelectorThread getSelector() {
+    return selectorData.sel;
+  }
+  
+  /** 
+   * Called when the protocol is connected.
+   */
+  public void connected(SelectorThread selThread) 
+          throws IOException {
+  }
+
+  /**
+   * It is possible to write data. 
+   * For both read and write - re-enable interest if you want more data. 
+   */
+  public void dataWriteable(SelectorThread selThread) throws IOException {
+  }
+
+  /**
+   * Data available for read.
+   * For both read and write - re-enable interest if you want more data. 
+   */
+  public void dataReceived(SelectorThread selThread) throws IOException {
+  }
+  
+  /** 
+   * nextTimeEvent reached. 
+   */
+  public void timeEvent(SelectorThread selThread) {
+  }
+
+  /** 
+   * Close was detected, or an unhandled exception happened while processing
+   * this callback.
+   */
+  public void channelClosed(SelectorThread selThread, Throwable ex) {
+  }
+  
+  /**
+   * Called on a callback created with acceptor() or inetdAcceptor() when 
+   * a new connection is accepted. 
+   * 
+   * @return callback to use on the new channel.
+   * 
+   * TODO: is there any case where something else besides registering read
+   * interest on the new connection is needed ? Maybe it could read some data ?
+   */
+  public SelectorCallback connectionAccepted(SelectorThread selThread, 
+                                             Channel sockC) {
+    return null;
+  }
+
+}
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java?rev=650945&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java Wed Apr 23 10:29:58 2008
@@ -0,0 +1,172 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+
+import org.apache.tomcat.util.buf.ByteChunk;
+
+/**
+ * Abstract NIO/APR to avoid some of the complexity and allow more code
+ * sharing and experiments. 
+ *
+ * SelectorThread provides non-blocking methods for read/write and generates 
+ * callbacks using SelectorCallback. It has no buffers of its own. 
+ * 
+ * TODO: add SSL support 
+ * 
+ * @author Costin Manolache
+ */
+public abstract class SelectorThread {
+  
+   /* Similar interfaces:
+    *  - Apr/Nio Endpoints in coyote
+    *  - twisted reactor 
+    *  - mina IoProcessor - also has an Apr/nio impl, ProtocolCallback->IoSession,
+    */
+  
+  /** 
+   * This is stored as the attachment in the selector.
+   */
+  static class SelectorData {
+    SelectorData(SelectorCallback selectorCallback) {
+      this.callback = selectorCallback;
+    }
+    
+    // APR long is wrapped in a ByteChannel as well - with few other longs.
+    Channel channelData;
+    
+    SelectorThread sel;
+    Object selKey;
+    SelectorCallback callback;
+    
+    // Current interest, used internally to avoid waking up if no change
+    // Also used for connect and accept.
+    int interest;
+    
+    // True if the callback wants to be notified of read/write
+    boolean writeInterest;
+    boolean readInterest;
+    
+    /** 
+     * If != 0 - the callback will be notified closely after this time.
+     * Used for timeouts. 
+     */
+    long nextTimeEvent = 0;
+    
+    // Saved to allow debug messages for bad interest/looping
+    int lastReadResult;
+    int lastWriteResult;
+    
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SelData: ")
+            .append(writeInterest ? "W/" : "")
+            .append(readInterest ? "R/" : "").append(selKey).append("/")
+            .append(channelData);
+        return sb.toString();
+    }
+  }
+
+  // ----------- IO handling -----------
+  protected long inactivityTimeout = 5000;
+  protected Thread selectorThread;
+
+  public boolean isSelectorThread() {
+      return Thread.currentThread() == selectorThread;
+  }
+
+  
+  /** 
+   * Close all resources, stop accepting, stop the thread.
+   * The actual stop will happen in background.
+   */
+  public void stop() {
+  }
+
+  /**
+   * This may be blocking - involves host resolution, connect.
+   * If the IP address is provided - it shouldn't block.
+   */
+  public void connect(String host, int port, 
+                      SelectorCallback sc) throws IOException {
+    
+  }
+  
+  /**
+   * Request a timer event. The thread will generate the events at 
+   * a configurable interval - for example no more often than 0.5 sec.
+   *  
+   * @param sc
+   * @param nextTimer time to call the timeEvent() callback
+   */
+  public void setTimerEventTime(SelectorCallback sc, long nextTimer) {
+      sc.selectorData.nextTimeEvent = nextTimer;
+  }
+  
+  public int readNonBlocking(SelectorCallback sc, ByteBuffer bb) 
+      throws IOException {
+    return 0;
+  }
+
+  public int writeNonBlocking(SelectorCallback sc, ByteBuffer reqBuf) 
+      throws IOException {
+    return 0;
+  }
+
+  /** 
+   * 
+   */
+  public int close(SelectorCallback sc) throws IOException {
+    return 0;
+  }
+  
+  /**
+   * Create a new server socket, register the callback. 
+   */
+  public void acceptor(SelectorCallback sc, 
+                       int port, 
+                       InetAddress inet, 
+                       int backlog,
+                       int serverTimeout)
+      throws IOException 
+  { 
+  }
+
+  /**
+   * For use with daemon tools, inetd - the server socket will be created 
+   * externally and passed as a file descriptor. This is needed to run on
+   * port 80 without root.  
+   */
+  public void inetdAcceptor(SelectorCallback sc) throws IOException {
+  }
+  
+  /** 
+   * Change the callback associated with the socket.
+   */
+  public void updateCallback(SelectorCallback old, SelectorCallback sc) {
+  }
+  
+  public void writeInterest(SelectorCallback sc, boolean writeInterest) {
+  }
+
+  public void readInterest(SelectorCallback sc, boolean readInterest) {
+  }
+  
+}
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java?rev=650945&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Wed Apr 23 10:29:58 2008
@@ -0,0 +1,608 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.tomcat.util.modeler.Registry;
+
+/**
+ * NIO implementation.
+ * 
+ * @author Costin Manolache
+ */
+public class SelectorThreadNio extends SelectorThread implements Runnable {
+
+    static Logger log = Logger.getLogger("SelectorThreadNio");
+
+    Selector selector;
+
+    ArrayList<SelectorCallback> readInterest = new ArrayList<SelectorCallback>();
+    ArrayList<SelectorCallback> writeInterest = new ArrayList<SelectorCallback>();
+    ArrayList<SelectorCallback> connectAcceptInterest = 
+      new ArrayList<SelectorCallback>();
+
+    AtomicInteger opened = new AtomicInteger();
+    AtomicInteger closed = new AtomicInteger();
+    AtomicInteger loops = new AtomicInteger();
+
+    // actives are also stored in the Selector. This is only updated in the main 
+    // thread
+    ArrayList<SelectorCallback> active = new ArrayList<SelectorCallback>();
+
+    boolean debug = false;
+    boolean running = true;
+
+    long lastWakeup = System.currentTimeMillis(); // last time we woke
+    long sleepTime;
+    long nextWakeup; // next scheduled wakeup
+
+    // Normally select will wait for the next time event - if it's 
+    // too far in future, maxSleep will override it.
+    private long maxSleep = 1000;
+
+    // Never sleep less than minSleep. This defines the resulution for 
+    // time events.
+    private long minSleep = 100;
+
+    boolean daemon = true;
+
+    public SelectorThreadNio() {
+        this(false);
+    }
+
+    public SelectorThreadNio(boolean daemon) {
+        try {
+            selectorThread = new Thread(this);
+            selector = Selector.open();
+            // TODO: start it on-demand, close it when not in use
+            selectorThread.setDaemon(daemon);
+            this.daemon = daemon;
+            selectorThread.start();
+        } catch(IOException e) {
+            throw new RuntimeException(e);
+        }        
+    }
+
+    public void setDaemon(boolean d) {
+        this.daemon = d;
+        
+    }
+
+    public void setName(String n) {
+        selectorThread.setName(n);
+        Registry registry = Registry.getRegistry(null, null);
+        try {
+            registry.registerComponent(this, ":name=" + n, "SelectorThread");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Opened sockets, waiting for something ( close at least ) 
+     */
+    public int getOpened() {
+        return opened.get();
+    }
+
+    /**
+     * Closed - we're done with them. 
+     */
+    public int getClosed() {
+        return closed.get();
+    }
+
+    /** 
+     * How many times we looped
+     */
+    public int getLoops() {
+        return loops.get();
+    }
+
+    public long getLastWakeup() {
+        return lastWakeup;
+    }
+
+    public long getTimeSinceLastWakeup() {
+        return System.currentTimeMillis() - lastWakeup;
+    }
+
+    public void stop() {
+        running = false;
+        log.info("Selector thread stop " + this);
+        selector.wakeup();
+    }
+
+    public void run() {
+        log.info("Selector thread start " + this);
+        while (running) {
+            // if we want timeouts - set here.
+            try {
+                loops.incrementAndGet();
+                // Check if new requests were added
+                processPending();
+
+                long now = System.currentTimeMillis();
+                if (nextWakeup > 0 && nextWakeup < now) {
+                    // We don't want to iterate on every I/O
+                    updateSleepTimeAndProcessTimeouts(now);
+                }
+                
+                int selected = selector.select(sleepTime);
+                lastWakeup = System.currentTimeMillis();
+
+                // handle events for existing req first.
+                if (selected != 0) {
+                    Set<SelectionKey> sel = selector.selectedKeys();
+
+                    Iterator<SelectionKey> i = sel.iterator();
+                    while (i.hasNext()) {
+                        SelectionKey sk = i.next();
+                        // avoid dup - disable the interest
+                        // TODO: is this really needed ? 
+                        int readyOps = sk.readyOps();
+                        sk.interestOps(sk.interestOps() & ~readyOps);
+                        // Find the request receiving the notification
+                        SelectorData sdata = (SelectorData) sk.attachment();
+                        SelectorCallback cstate = sdata.callback;
+                        if (debug) {
+                            log.info("SelectorThread: selected " + cstate + " " + readyOps);
+                        }
+
+                        if (sk.isValid() && sk.isAcceptable()) {
+                            handleAccept(cstate, sk);
+                            continue;
+                        }
+
+                        SocketChannel sc = (SocketChannel) sk.channel();
+                        if (!sk.isValid()) {
+                            if (debug) {
+                                log.info("SelectorThread: !isValid, closed socket " + cstate);
+                            }
+                            close(sk, cstate, sc, null);
+                            continue;
+                        }
+
+                        try {
+                            // callbacks
+                            if (sk.isValid() && sk.isConnectable()) {
+                                handleConnect(cstate, sc);
+                            }
+
+                            if (sk.isValid() && sk.isWritable()) {
+                                cstate.selectorData.lastWriteResult = 0;
+                                cstate.dataWriteable(this);
+                                if (cstate.selectorData.lastWriteResult > 0 && 
+                                        cstate.selectorData.writeInterest) {
+                                    log.warning("SelectorThread: write interest" +
+                                                " after incomplete write");
+                                }
+                            }
+
+                            if (sk.isReadable()) {
+                                cstate.selectorData.lastReadResult = 0;
+                                cstate.dataReceived(this);
+                                if (cstate.selectorData.lastReadResult > 0 && 
+                                        cstate.selectorData.readInterest) {
+                                    log.warning("SelectorThread: read interest" +
+                                                " after incomplete read");
+                                }
+                            }
+                        } catch (Throwable t) {
+                            t.printStackTrace();
+                            close(sk, cstate, sc, t);
+                        }
+
+                    }
+                    // All at once
+                    sel.clear();
+                }
+
+            } catch (Throwable e) {
+                log.log(Level.SEVERE, "SelectorThread: Error in select", e);
+            }
+        } // while(running)
+    }
+
+    private void handleConnect(SelectorCallback cstate, SocketChannel sc)
+            throws IOException, SocketException {
+        sc.finishConnect();
+        sc.socket().setSoLinger(true, 0);
+        cstate.connected(this);
+        readInterest(cstate, true);
+    }
+
+    private void handleAccept(SelectorCallback cstate, SelectionKey sk)
+            throws IOException, ClosedChannelException {
+        SelectableChannel selc = sk.channel();
+        ServerSocketChannel ssc=(ServerSocketChannel)selc;
+        SocketChannel sockC = ssc.accept();
+//                            if (sockC == null) {
+//                                continue;
+//                            }
+        sockC.configureBlocking(false);
+        SelectorCallback acb = cstate.connectionAccepted(this, sockC);
+        acb.selectorData.selKey = sockC.register(selector, 
+            SelectionKey.OP_READ, 
+            acb.selectorData);
+        acb.selectorData.channelData = sockC;
+        acb.selectorData.sel = this;
+        active.add(acb);
+        
+        sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
+    }
+
+    public void updateCallback(SelectorCallback old, 
+                               SelectorCallback cstate) {
+        cstate.selectorData = old.selectorData;
+        cstate.selectorData.callback = cstate;
+        // leave old.selectorData around in case some thread is still using it
+    }
+
+    public void writeInterest(SelectorCallback cstate, boolean b) {
+        if (b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) != 0) {
+            return;
+        }
+        if (!b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) == 0) {
+            return;
+        }
+        if (Thread.currentThread() == selectorThread) {
+            cstate.selectorData.writeInterest = b;
+            if (cstate.selectorData.writeInterest) {
+                cstate.selectorData.interest = 
+                    cstate.selectorData.interest | SelectionKey.OP_WRITE;
+            } else {
+                cstate.selectorData.interest = 
+                    cstate.selectorData.interest & ~SelectionKey.OP_WRITE;                
+            }
+            SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;            
+            sk.interestOps(cstate.selectorData.interest);
+            return;
+        }
+        if (!b) {
+            return; // can't remove interest from regular thread
+        }
+        synchronized (writeInterest) {
+            writeInterest.add(cstate);
+        }
+        selector.wakeup();
+    }
+    
+    public void readInterest(SelectorCallback cstate, boolean b) {
+        if (Thread.currentThread() == selectorThread) {
+            cstate.selectorData.readInterest = b; 
+            selThreadUpdateInterest(cstate);
+            log.info("Registering read interest");
+            return;
+        }
+        if (b && (cstate.selectorData.interest | SelectionKey.OP_READ) != 0) {
+            return;
+        }
+        if (!b && (cstate.selectorData.interest | SelectionKey.OP_READ) == 0) {
+            return;
+        }
+        // Schedule the interest update.
+        synchronized (readInterest) {
+            readInterest.add(cstate);
+        }
+        log.info("Registering pending read interest");
+        selector.wakeup();
+    }
+
+
+    private void selThreadUpdateInterest(SelectorCallback cstate) {
+        SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+        if (sk != null && sk.isValid()) {
+            if (debug) {
+                log.info("SelectorThread: process pending: interest " + 
+                        cstate.selectorData.interest + " for " + cstate);
+            }
+            if (cstate.selectorData.readInterest) {
+                cstate.selectorData.interest = 
+                    cstate.selectorData.interest | SelectionKey.OP_READ;
+            } else {
+                cstate.selectorData.interest = 
+                    cstate.selectorData.interest & ~SelectionKey.OP_READ;                
+            }
+            sk.interestOps(cstate.selectorData.interest);
+        }
+    }
+    
+    private void close(SelectionKey sk,
+                       SelectorCallback cstate,
+                       Channel ch, 
+                       Throwable ex) {
+        try {
+            sk.cancel();
+            if (ch instanceof SocketChannel) {
+                SocketChannel sc = (SocketChannel) ch;
+                if (sc.isConnected()) {
+                    int o = opened.decrementAndGet();
+                    //System.err.println("Close socket, opened=" + o);
+                    try {
+                        sc.socket().shutdownInput();
+                    } catch(IOException io1) {
+                    }
+                    try {
+                        sc.socket().shutdownOutput(); // TCP end to the other side
+                    } catch(IOException io1) {
+                    }
+                    sc.socket().close();
+                }
+            }
+            ch.close();
+            closed.incrementAndGet();
+            cstate.channelClosed(this, ex);
+            active.remove(cstate);
+        } catch (IOException ex2) {
+            log.severe("SelectorThread: Error closing socket " + ex2);
+            ex2.printStackTrace();
+        }
+    }
+
+    // --------------- Socket op abstractions ------------
+
+    @Override
+    public int readNonBlocking(SelectorCallback cstate, ByteBuffer bb) 
+    throws IOException {
+        int done = ((SocketChannel) cstate.selectorData.channelData).read(bb);
+        cstate.selectorData.lastReadResult = done;
+        if (done < 0) {
+            if (debug) {
+                log.info("SelectorThread: EOF while reading");
+            }
+        }
+        return done;
+    }
+
+    @Override
+    public int writeNonBlocking(SelectorCallback cstate, ByteBuffer bb) 
+        throws IOException {
+        int done = ((SocketChannel) cstate.selectorData.channelData).write(bb);
+        cstate.selectorData.lastWriteResult = done;
+        return done;
+    }
+
+    /** 
+     */
+    @Override
+    public void connect(String host, int port, SelectorCallback cstate) 
+    throws IOException {
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(false);
+        int o = opened.incrementAndGet();
+        socketChannel.connect(new InetSocketAddress(host,  port));
+        cstate.selectorData.channelData = socketChannel;
+        cstate.selectorData.interest = SelectionKey.OP_CONNECT;
+        synchronized (connectAcceptInterest) {
+            connectAcceptInterest.add(cstate);
+        }
+        selector.wakeup();
+    }
+
+    // TODO
+    public void configureSocket(ByteChannel ch,
+                                boolean noDelay) throws IOException {
+        SocketChannel sockC = (SocketChannel) ch;
+        sockC.socket().setTcpNoDelay(noDelay);
+    }
+
+    // TODO
+    public void setSocketOptions(SelectorCallback cstate,
+                                 int linger, 
+                                 boolean tcpNoDelay,
+                                 int socketTimeout)
+    throws IOException {
+
+        SocketChannel socketChannel = 
+            (SocketChannel) cstate.selectorData.channelData;
+        Socket socket = socketChannel.socket();
+
+        if(linger >= 0 ) 
+            socket.setSoLinger( true, linger);
+        if( tcpNoDelay )
+            socket.setTcpNoDelay(tcpNoDelay);
+        if( socketTimeout > 0 )
+            socket.setSoTimeout( socketTimeout );
+    }
+
+    @Override
+    public int close(SelectorCallback cstate) throws IOException {
+        close((SelectionKey) cstate.selectorData.selKey, cstate, 
+              cstate.selectorData.channelData, null);
+        return 0;
+    }
+
+
+
+    public void acceptor(SelectorCallback cstate, 
+                         int port, 
+                         InetAddress inet, 
+                         int backlog,
+                         int serverTimeout)
+    throws IOException 
+    {
+        ServerSocketChannel ssc=ServerSocketChannel.open();
+        ServerSocket serverSocket = ssc.socket();
+        SocketAddress sa = null;
+        if (inet == null) {
+            sa = new InetSocketAddress( port );
+        } else {
+            sa = new InetSocketAddress(inet, port);
+        }
+        if (backlog > 0) {
+            serverSocket.bind( sa , backlog);
+        }
+        if( serverTimeout >= 0 ) {
+            serverSocket.setSoTimeout( serverTimeout );
+        }
+
+        ssc.configureBlocking(false);
+
+        cstate.selectorData.channelData = ssc;
+        cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
+        // cstate must return 'OP_ACCEPT' as interest
+        synchronized (connectAcceptInterest) {
+            connectAcceptInterest.add(cstate);
+        }
+        selector.wakeup();
+    }
+
+    public void inetdAcceptor(SelectorCallback cstate) throws IOException {
+        SelectorProvider sp=SelectorProvider.provider();
+
+        Channel ch=sp.inheritedChannel();
+        if(ch!=null ) {
+            //("Inherited: " + ch.getClass().getName());
+            // blocking mode
+            ServerSocketChannel ssc=(ServerSocketChannel)ch;
+            cstate.selectorData.channelData = ssc;
+            cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
+            // cstate must return 'OP_ACCEPT' as interest
+            synchronized (connectAcceptInterest) {
+                connectAcceptInterest.add(cstate);
+            }
+            selector.wakeup();
+        }
+    }
+
+    // -------------- Housekeeping -------------
+    /**
+     *  Same as APR connector - iterate over tasks, get 
+     *  smallest timeout
+     * @throws IOException 
+     */
+    void updateSleepTimeAndProcessTimeouts(long now) throws IOException {
+        long min = Long.MAX_VALUE;
+        // TODO: test with large sets, maybe sort
+        Iterator<SelectorCallback> activeIt = active.iterator();
+        while(activeIt.hasNext()) {
+          SelectorCallback cstate = activeIt.next();
+          if (! cstate.selectorData.channelData.isOpen()) {
+            log.info("Closed socket " + cstate.selectorData.channelData);
+            activeIt.remove();
+            close(cstate); // generate callback, increment counters.
+          }
+          
+          long t = cstate.selectorData.nextTimeEvent;
+          if (t == 0) {
+            continue;
+          }
+          if (t < now) {
+            // Timeout
+            cstate.timeEvent(this);
+            // TODO: make sure this is updated if it was selected
+            continue;
+          }
+          if (t < min) {
+            min = t;
+          }
+        }
+        long nextSleep = min - now;
+        if (nextSleep > maxSleep) {
+            sleepTime = maxSleep;
+        } else if (nextSleep < minSleep) {
+            sleepTime = minSleep;
+        } else {
+            sleepTime = nextSleep;
+        }
+        nextWakeup = now + sleepTime;
+    }
+
+    private void processPending() throws IOException {
+        synchronized (connectAcceptInterest) {
+            Iterator<SelectorCallback> ci = connectAcceptInterest.iterator();
+
+            while (ci.hasNext()) {
+                SelectorCallback cstate = ci.next();
+                SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+                
+                // Find host, port - initiate connection
+                try {
+                    // Accept interest ?
+                    if (cstate.selectorData.channelData instanceof ServerSocketChannel) {
+                        ServerSocketChannel socketChannel = 
+                            (ServerSocketChannel) cstate.selectorData.channelData;
+                        cstate.selectorData.sel = this;
+                        cstate.selectorData.selKey = 
+                          socketChannel.register(selector, 
+                              SelectionKey.OP_ACCEPT, cstate.selectorData);
+                        
+                        cstate.selectorData.channelData = socketChannel;
+                        active.add(cstate);
+                    } else {
+                        SocketChannel socketChannel =
+                          (SocketChannel) cstate.selectorData.channelData;
+                        cstate.selectorData.sel = this;
+                        cstate.selectorData.selKey = 
+                          socketChannel.register(selector, 
+                              SelectionKey.OP_CONNECT, cstate.selectorData);              
+                        active.add(cstate);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+            connectAcceptInterest.clear();
+        }
+        
+        // Update interest 
+        if (readInterest.size() > 0) {
+            synchronized (readInterest) {
+                Iterator<SelectorCallback> ci = readInterest.iterator();
+                while (ci.hasNext()) {
+                    SelectorCallback cstate = ci.next();
+                    selThreadUpdateInterest(cstate);
+                }
+                readInterest.clear();
+            }
+        }
+        if (writeInterest.size() > 0) {
+            synchronized (writeInterest) {
+                Iterator<SelectorCallback> ci = writeInterest.iterator();
+                while (ci.hasNext()) {
+                    SelectorCallback cstate = ci.next();
+                    // Fake callback - will update as side effect
+                    cstate.dataWriteable(this);
+                }
+                readInterest.clear();
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org