You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2008/04/23 19:30:00 UTC
svn commit: r650945 - in
/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net:
SelectorCallback.java SelectorThread.java SelectorThreadNio.java
Author: costin
Date: Wed Apr 23 10:29:58 2008
New Revision: 650945
URL: http://svn.apache.org/viewvc?rev=650945&view=rev
Log:
Ok, finally - the first part of the new connector, or the last part of tomcat-lite experiment :-)
It is obviously quite independent of the rest of tomcat-lite, probably the last to move out of sandbox.
I have an Apr impl as well, but it's broken now, need to fix it again.
This is the IO abstraction - the connector goal is to do all I/O in non-blocking mode and allow
completely non-blocking adapters ( i.e. a bit more than regular coyote ).
Added:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (with props)
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (with props)
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (with props)
Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java?rev=650945&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java Wed Apr 23 10:29:58 2008
@@ -0,0 +1,87 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+
+/**
+ * Notiy user code of events. All methods are called from the selector thread,
+ * they should not block. The reason is to allow parsing and non-blocking
+ * processing to be done as fast as possible, and avoid the need for
+ * SelectorThread to deal with a thread pool.
+ *
+ * This class wraps the Channel. For APR we wrap the socket and few other
+ * fields we need for non-blocking operation in a ByteChannel, the code
+ * seems cleaner and it's nice to be able to use APR more portably.
+ * ( older version used long - but non-blocking connect needs a second param )
+ */
+public class SelectorCallback {
+ protected SelectorThread.SelectorData selectorData = new SelectorThread.SelectorData(this);
+
+ public SelectorThread getSelector() {
+ return selectorData.sel;
+ }
+
+ /**
+ * Called when the protocol is connected.
+ */
+ public void connected(SelectorThread selThread)
+ throws IOException {
+ }
+
+ /**
+ * It is possible to write data.
+ * For both read and write - re-enable interest if you want more data.
+ */
+ public void dataWriteable(SelectorThread selThread) throws IOException {
+ }
+
+ /**
+ * Data available for read.
+ * For both read and write - re-enable interest if you want more data.
+ */
+ public void dataReceived(SelectorThread selThread) throws IOException {
+ }
+
+ /**
+ * nextTimeEvent reached.
+ */
+ public void timeEvent(SelectorThread selThread) {
+ }
+
+ /**
+ * Close was detected, or an unhandled exception happened while processing
+ * this callback.
+ */
+ public void channelClosed(SelectorThread selThread, Throwable ex) {
+ }
+
+ /**
+ * Called on a callback created with acceptor() or inetdAcceptor() when
+ * a new connection is accepted.
+ *
+ * @return callback to use on the new channel.
+ *
+ * TODO: is there any case where something else besides registering read
+ * interest on the new connection is needed ? Maybe it could read some data ?
+ */
+ public SelectorCallback connectionAccepted(SelectorThread selThread,
+ Channel sockC) {
+ return null;
+ }
+
+}
\ No newline at end of file
Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java?rev=650945&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java Wed Apr 23 10:29:58 2008
@@ -0,0 +1,172 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+
+import org.apache.tomcat.util.buf.ByteChunk;
+
+/**
+ * Abstract NIO/APR to avoid some of the complexity and allow more code
+ * sharing and experiments.
+ *
+ * SelectorThread provides non-blocking methods for read/write and generates
+ * callbacks using SelectorCallback. It has no buffers of its own.
+ *
+ * TODO: add SSL support
+ *
+ * @author Costin Manolache
+ */
+public abstract class SelectorThread {
+
+ /* Similar interfaces:
+ * - Apr/Nio Endpoints in coyote
+ * - twisted reactor
+ * - mina IoProcessor - also has an Apr/nio impl, ProtocolCallback->IoSession,
+ */
+
+ /**
+ * This is stored as the attachment in the selector.
+ */
+ static class SelectorData {
+ SelectorData(SelectorCallback selectorCallback) {
+ this.callback = selectorCallback;
+ }
+
+ // APR long is wrapped in a ByteChannel as well - with few other longs.
+ Channel channelData;
+
+ SelectorThread sel;
+ Object selKey;
+ SelectorCallback callback;
+
+ // Current interest, used internally to avoid waking up if no change
+ // Also used for connect and accept.
+ int interest;
+
+ // True if the callback wants to be notified of read/write
+ boolean writeInterest;
+ boolean readInterest;
+
+ /**
+ * If != 0 - the callback will be notified closely after this time.
+ * Used for timeouts.
+ */
+ long nextTimeEvent = 0;
+
+ // Saved to allow debug messages for bad interest/looping
+ int lastReadResult;
+ int lastWriteResult;
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("SelData: ")
+ .append(writeInterest ? "W/" : "")
+ .append(readInterest ? "R/" : "").append(selKey).append("/")
+ .append(channelData);
+ return sb.toString();
+ }
+ }
+
+ // ----------- IO handling -----------
+ protected long inactivityTimeout = 5000;
+ protected Thread selectorThread;
+
+ public boolean isSelectorThread() {
+ return Thread.currentThread() == selectorThread;
+ }
+
+
+ /**
+ * Close all resources, stop accepting, stop the thread.
+ * The actual stop will happen in background.
+ */
+ public void stop() {
+ }
+
+ /**
+ * This may be blocking - involves host resolution, connect.
+ * If the IP address is provided - it shouldn't block.
+ */
+ public void connect(String host, int port,
+ SelectorCallback sc) throws IOException {
+
+ }
+
+ /**
+ * Request a timer event. The thread will generate the events at
+ * a configurable interval - for example no more often than 0.5 sec.
+ *
+ * @param sc
+ * @param nextTimer time to call the timeEvent() callback
+ */
+ public void setTimerEventTime(SelectorCallback sc, long nextTimer) {
+ sc.selectorData.nextTimeEvent = nextTimer;
+ }
+
+ public int readNonBlocking(SelectorCallback sc, ByteBuffer bb)
+ throws IOException {
+ return 0;
+ }
+
+ public int writeNonBlocking(SelectorCallback sc, ByteBuffer reqBuf)
+ throws IOException {
+ return 0;
+ }
+
+ /**
+ *
+ */
+ public int close(SelectorCallback sc) throws IOException {
+ return 0;
+ }
+
+ /**
+ * Create a new server socket, register the callback.
+ */
+ public void acceptor(SelectorCallback sc,
+ int port,
+ InetAddress inet,
+ int backlog,
+ int serverTimeout)
+ throws IOException
+ {
+ }
+
+ /**
+ * For use with daemon tools, inetd - the server socket will be created
+ * externally and passed as a file descriptor. This is needed to run on
+ * port 80 without root.
+ */
+ public void inetdAcceptor(SelectorCallback sc) throws IOException {
+ }
+
+ /**
+ * Change the callback associated with the socket.
+ */
+ public void updateCallback(SelectorCallback old, SelectorCallback sc) {
+ }
+
+ public void writeInterest(SelectorCallback sc, boolean writeInterest) {
+ }
+
+ public void readInterest(SelectorCallback sc, boolean readInterest) {
+ }
+
+}
\ No newline at end of file
Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java?rev=650945&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Wed Apr 23 10:29:58 2008
@@ -0,0 +1,608 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.tomcat.util.modeler.Registry;
+
+/**
+ * NIO implementation.
+ *
+ * @author Costin Manolache
+ */
+public class SelectorThreadNio extends SelectorThread implements Runnable {
+
+ static Logger log = Logger.getLogger("SelectorThreadNio");
+
+ Selector selector;
+
+ ArrayList<SelectorCallback> readInterest = new ArrayList<SelectorCallback>();
+ ArrayList<SelectorCallback> writeInterest = new ArrayList<SelectorCallback>();
+ ArrayList<SelectorCallback> connectAcceptInterest =
+ new ArrayList<SelectorCallback>();
+
+ AtomicInteger opened = new AtomicInteger();
+ AtomicInteger closed = new AtomicInteger();
+ AtomicInteger loops = new AtomicInteger();
+
+ // actives are also stored in the Selector. This is only updated in the main
+ // thread
+ ArrayList<SelectorCallback> active = new ArrayList<SelectorCallback>();
+
+ boolean debug = false;
+ boolean running = true;
+
+ long lastWakeup = System.currentTimeMillis(); // last time we woke
+ long sleepTime;
+ long nextWakeup; // next scheduled wakeup
+
+ // Normally select will wait for the next time event - if it's
+ // too far in future, maxSleep will override it.
+ private long maxSleep = 1000;
+
+ // Never sleep less than minSleep. This defines the resulution for
+ // time events.
+ private long minSleep = 100;
+
+ boolean daemon = true;
+
+ public SelectorThreadNio() {
+ this(false);
+ }
+
+ public SelectorThreadNio(boolean daemon) {
+ try {
+ selectorThread = new Thread(this);
+ selector = Selector.open();
+ // TODO: start it on-demand, close it when not in use
+ selectorThread.setDaemon(daemon);
+ this.daemon = daemon;
+ selectorThread.start();
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setDaemon(boolean d) {
+ this.daemon = d;
+
+ }
+
+ public void setName(String n) {
+ selectorThread.setName(n);
+ Registry registry = Registry.getRegistry(null, null);
+ try {
+ registry.registerComponent(this, ":name=" + n, "SelectorThread");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Opened sockets, waiting for something ( close at least )
+ */
+ public int getOpened() {
+ return opened.get();
+ }
+
+ /**
+ * Closed - we're done with them.
+ */
+ public int getClosed() {
+ return closed.get();
+ }
+
+ /**
+ * How many times we looped
+ */
+ public int getLoops() {
+ return loops.get();
+ }
+
+ public long getLastWakeup() {
+ return lastWakeup;
+ }
+
+ public long getTimeSinceLastWakeup() {
+ return System.currentTimeMillis() - lastWakeup;
+ }
+
+ public void stop() {
+ running = false;
+ log.info("Selector thread stop " + this);
+ selector.wakeup();
+ }
+
+ public void run() {
+ log.info("Selector thread start " + this);
+ while (running) {
+ // if we want timeouts - set here.
+ try {
+ loops.incrementAndGet();
+ // Check if new requests were added
+ processPending();
+
+ long now = System.currentTimeMillis();
+ if (nextWakeup > 0 && nextWakeup < now) {
+ // We don't want to iterate on every I/O
+ updateSleepTimeAndProcessTimeouts(now);
+ }
+
+ int selected = selector.select(sleepTime);
+ lastWakeup = System.currentTimeMillis();
+
+ // handle events for existing req first.
+ if (selected != 0) {
+ Set<SelectionKey> sel = selector.selectedKeys();
+
+ Iterator<SelectionKey> i = sel.iterator();
+ while (i.hasNext()) {
+ SelectionKey sk = i.next();
+ // avoid dup - disable the interest
+ // TODO: is this really needed ?
+ int readyOps = sk.readyOps();
+ sk.interestOps(sk.interestOps() & ~readyOps);
+ // Find the request receiving the notification
+ SelectorData sdata = (SelectorData) sk.attachment();
+ SelectorCallback cstate = sdata.callback;
+ if (debug) {
+ log.info("SelectorThread: selected " + cstate + " " + readyOps);
+ }
+
+ if (sk.isValid() && sk.isAcceptable()) {
+ handleAccept(cstate, sk);
+ continue;
+ }
+
+ SocketChannel sc = (SocketChannel) sk.channel();
+ if (!sk.isValid()) {
+ if (debug) {
+ log.info("SelectorThread: !isValid, closed socket " + cstate);
+ }
+ close(sk, cstate, sc, null);
+ continue;
+ }
+
+ try {
+ // callbacks
+ if (sk.isValid() && sk.isConnectable()) {
+ handleConnect(cstate, sc);
+ }
+
+ if (sk.isValid() && sk.isWritable()) {
+ cstate.selectorData.lastWriteResult = 0;
+ cstate.dataWriteable(this);
+ if (cstate.selectorData.lastWriteResult > 0 &&
+ cstate.selectorData.writeInterest) {
+ log.warning("SelectorThread: write interest" +
+ " after incomplete write");
+ }
+ }
+
+ if (sk.isReadable()) {
+ cstate.selectorData.lastReadResult = 0;
+ cstate.dataReceived(this);
+ if (cstate.selectorData.lastReadResult > 0 &&
+ cstate.selectorData.readInterest) {
+ log.warning("SelectorThread: read interest" +
+ " after incomplete read");
+ }
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ close(sk, cstate, sc, t);
+ }
+
+ }
+ // All at once
+ sel.clear();
+ }
+
+ } catch (Throwable e) {
+ log.log(Level.SEVERE, "SelectorThread: Error in select", e);
+ }
+ } // while(running)
+ }
+
+ private void handleConnect(SelectorCallback cstate, SocketChannel sc)
+ throws IOException, SocketException {
+ sc.finishConnect();
+ sc.socket().setSoLinger(true, 0);
+ cstate.connected(this);
+ readInterest(cstate, true);
+ }
+
+ private void handleAccept(SelectorCallback cstate, SelectionKey sk)
+ throws IOException, ClosedChannelException {
+ SelectableChannel selc = sk.channel();
+ ServerSocketChannel ssc=(ServerSocketChannel)selc;
+ SocketChannel sockC = ssc.accept();
+// if (sockC == null) {
+// continue;
+// }
+ sockC.configureBlocking(false);
+ SelectorCallback acb = cstate.connectionAccepted(this, sockC);
+ acb.selectorData.selKey = sockC.register(selector,
+ SelectionKey.OP_READ,
+ acb.selectorData);
+ acb.selectorData.channelData = sockC;
+ acb.selectorData.sel = this;
+ active.add(acb);
+
+ sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
+ }
+
+ public void updateCallback(SelectorCallback old,
+ SelectorCallback cstate) {
+ cstate.selectorData = old.selectorData;
+ cstate.selectorData.callback = cstate;
+ // leave old.selectorData around in case some thread is still using it
+ }
+
+ public void writeInterest(SelectorCallback cstate, boolean b) {
+ if (b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) != 0) {
+ return;
+ }
+ if (!b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) == 0) {
+ return;
+ }
+ if (Thread.currentThread() == selectorThread) {
+ cstate.selectorData.writeInterest = b;
+ if (cstate.selectorData.writeInterest) {
+ cstate.selectorData.interest =
+ cstate.selectorData.interest | SelectionKey.OP_WRITE;
+ } else {
+ cstate.selectorData.interest =
+ cstate.selectorData.interest & ~SelectionKey.OP_WRITE;
+ }
+ SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+ sk.interestOps(cstate.selectorData.interest);
+ return;
+ }
+ if (!b) {
+ return; // can't remove interest from regular thread
+ }
+ synchronized (writeInterest) {
+ writeInterest.add(cstate);
+ }
+ selector.wakeup();
+ }
+
+ public void readInterest(SelectorCallback cstate, boolean b) {
+ if (Thread.currentThread() == selectorThread) {
+ cstate.selectorData.readInterest = b;
+ selThreadUpdateInterest(cstate);
+ log.info("Registering read interest");
+ return;
+ }
+ if (b && (cstate.selectorData.interest | SelectionKey.OP_READ) != 0) {
+ return;
+ }
+ if (!b && (cstate.selectorData.interest | SelectionKey.OP_READ) == 0) {
+ return;
+ }
+ // Schedule the interest update.
+ synchronized (readInterest) {
+ readInterest.add(cstate);
+ }
+ log.info("Registering pending read interest");
+ selector.wakeup();
+ }
+
+
+ private void selThreadUpdateInterest(SelectorCallback cstate) {
+ SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+ if (sk != null && sk.isValid()) {
+ if (debug) {
+ log.info("SelectorThread: process pending: interest " +
+ cstate.selectorData.interest + " for " + cstate);
+ }
+ if (cstate.selectorData.readInterest) {
+ cstate.selectorData.interest =
+ cstate.selectorData.interest | SelectionKey.OP_READ;
+ } else {
+ cstate.selectorData.interest =
+ cstate.selectorData.interest & ~SelectionKey.OP_READ;
+ }
+ sk.interestOps(cstate.selectorData.interest);
+ }
+ }
+
+ private void close(SelectionKey sk,
+ SelectorCallback cstate,
+ Channel ch,
+ Throwable ex) {
+ try {
+ sk.cancel();
+ if (ch instanceof SocketChannel) {
+ SocketChannel sc = (SocketChannel) ch;
+ if (sc.isConnected()) {
+ int o = opened.decrementAndGet();
+ //System.err.println("Close socket, opened=" + o);
+ try {
+ sc.socket().shutdownInput();
+ } catch(IOException io1) {
+ }
+ try {
+ sc.socket().shutdownOutput(); // TCP end to the other side
+ } catch(IOException io1) {
+ }
+ sc.socket().close();
+ }
+ }
+ ch.close();
+ closed.incrementAndGet();
+ cstate.channelClosed(this, ex);
+ active.remove(cstate);
+ } catch (IOException ex2) {
+ log.severe("SelectorThread: Error closing socket " + ex2);
+ ex2.printStackTrace();
+ }
+ }
+
+ // --------------- Socket op abstractions ------------
+
+ @Override
+ public int readNonBlocking(SelectorCallback cstate, ByteBuffer bb)
+ throws IOException {
+ int done = ((SocketChannel) cstate.selectorData.channelData).read(bb);
+ cstate.selectorData.lastReadResult = done;
+ if (done < 0) {
+ if (debug) {
+ log.info("SelectorThread: EOF while reading");
+ }
+ }
+ return done;
+ }
+
+ @Override
+ public int writeNonBlocking(SelectorCallback cstate, ByteBuffer bb)
+ throws IOException {
+ int done = ((SocketChannel) cstate.selectorData.channelData).write(bb);
+ cstate.selectorData.lastWriteResult = done;
+ return done;
+ }
+
+ /**
+ */
+ @Override
+ public void connect(String host, int port, SelectorCallback cstate)
+ throws IOException {
+ SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
+ int o = opened.incrementAndGet();
+ socketChannel.connect(new InetSocketAddress(host, port));
+ cstate.selectorData.channelData = socketChannel;
+ cstate.selectorData.interest = SelectionKey.OP_CONNECT;
+ synchronized (connectAcceptInterest) {
+ connectAcceptInterest.add(cstate);
+ }
+ selector.wakeup();
+ }
+
+ // TODO
+ public void configureSocket(ByteChannel ch,
+ boolean noDelay) throws IOException {
+ SocketChannel sockC = (SocketChannel) ch;
+ sockC.socket().setTcpNoDelay(noDelay);
+ }
+
+ // TODO
+ public void setSocketOptions(SelectorCallback cstate,
+ int linger,
+ boolean tcpNoDelay,
+ int socketTimeout)
+ throws IOException {
+
+ SocketChannel socketChannel =
+ (SocketChannel) cstate.selectorData.channelData;
+ Socket socket = socketChannel.socket();
+
+ if(linger >= 0 )
+ socket.setSoLinger( true, linger);
+ if( tcpNoDelay )
+ socket.setTcpNoDelay(tcpNoDelay);
+ if( socketTimeout > 0 )
+ socket.setSoTimeout( socketTimeout );
+ }
+
+ @Override
+ public int close(SelectorCallback cstate) throws IOException {
+ close((SelectionKey) cstate.selectorData.selKey, cstate,
+ cstate.selectorData.channelData, null);
+ return 0;
+ }
+
+
+
+ public void acceptor(SelectorCallback cstate,
+ int port,
+ InetAddress inet,
+ int backlog,
+ int serverTimeout)
+ throws IOException
+ {
+ ServerSocketChannel ssc=ServerSocketChannel.open();
+ ServerSocket serverSocket = ssc.socket();
+ SocketAddress sa = null;
+ if (inet == null) {
+ sa = new InetSocketAddress( port );
+ } else {
+ sa = new InetSocketAddress(inet, port);
+ }
+ if (backlog > 0) {
+ serverSocket.bind( sa , backlog);
+ }
+ if( serverTimeout >= 0 ) {
+ serverSocket.setSoTimeout( serverTimeout );
+ }
+
+ ssc.configureBlocking(false);
+
+ cstate.selectorData.channelData = ssc;
+ cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
+ // cstate must return 'OP_ACCEPT' as interest
+ synchronized (connectAcceptInterest) {
+ connectAcceptInterest.add(cstate);
+ }
+ selector.wakeup();
+ }
+
+ public void inetdAcceptor(SelectorCallback cstate) throws IOException {
+ SelectorProvider sp=SelectorProvider.provider();
+
+ Channel ch=sp.inheritedChannel();
+ if(ch!=null ) {
+ //("Inherited: " + ch.getClass().getName());
+ // blocking mode
+ ServerSocketChannel ssc=(ServerSocketChannel)ch;
+ cstate.selectorData.channelData = ssc;
+ cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
+ // cstate must return 'OP_ACCEPT' as interest
+ synchronized (connectAcceptInterest) {
+ connectAcceptInterest.add(cstate);
+ }
+ selector.wakeup();
+ }
+ }
+
+ // -------------- Housekeeping -------------
+ /**
+ * Same as APR connector - iterate over tasks, get
+ * smallest timeout
+ * @throws IOException
+ */
+ void updateSleepTimeAndProcessTimeouts(long now) throws IOException {
+ long min = Long.MAX_VALUE;
+ // TODO: test with large sets, maybe sort
+ Iterator<SelectorCallback> activeIt = active.iterator();
+ while(activeIt.hasNext()) {
+ SelectorCallback cstate = activeIt.next();
+ if (! cstate.selectorData.channelData.isOpen()) {
+ log.info("Closed socket " + cstate.selectorData.channelData);
+ activeIt.remove();
+ close(cstate); // generate callback, increment counters.
+ }
+
+ long t = cstate.selectorData.nextTimeEvent;
+ if (t == 0) {
+ continue;
+ }
+ if (t < now) {
+ // Timeout
+ cstate.timeEvent(this);
+ // TODO: make sure this is updated if it was selected
+ continue;
+ }
+ if (t < min) {
+ min = t;
+ }
+ }
+ long nextSleep = min - now;
+ if (nextSleep > maxSleep) {
+ sleepTime = maxSleep;
+ } else if (nextSleep < minSleep) {
+ sleepTime = minSleep;
+ } else {
+ sleepTime = nextSleep;
+ }
+ nextWakeup = now + sleepTime;
+ }
+
+ private void processPending() throws IOException {
+ synchronized (connectAcceptInterest) {
+ Iterator<SelectorCallback> ci = connectAcceptInterest.iterator();
+
+ while (ci.hasNext()) {
+ SelectorCallback cstate = ci.next();
+ SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+
+ // Find host, port - initiate connection
+ try {
+ // Accept interest ?
+ if (cstate.selectorData.channelData instanceof ServerSocketChannel) {
+ ServerSocketChannel socketChannel =
+ (ServerSocketChannel) cstate.selectorData.channelData;
+ cstate.selectorData.sel = this;
+ cstate.selectorData.selKey =
+ socketChannel.register(selector,
+ SelectionKey.OP_ACCEPT, cstate.selectorData);
+
+ cstate.selectorData.channelData = socketChannel;
+ active.add(cstate);
+ } else {
+ SocketChannel socketChannel =
+ (SocketChannel) cstate.selectorData.channelData;
+ cstate.selectorData.sel = this;
+ cstate.selectorData.selKey =
+ socketChannel.register(selector,
+ SelectionKey.OP_CONNECT, cstate.selectorData);
+ active.add(cstate);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ connectAcceptInterest.clear();
+ }
+
+ // Update interest
+ if (readInterest.size() > 0) {
+ synchronized (readInterest) {
+ Iterator<SelectorCallback> ci = readInterest.iterator();
+ while (ci.hasNext()) {
+ SelectorCallback cstate = ci.next();
+ selThreadUpdateInterest(cstate);
+ }
+ readInterest.clear();
+ }
+ }
+ if (writeInterest.size() > 0) {
+ synchronized (writeInterest) {
+ Iterator<SelectorCallback> ci = writeInterest.iterator();
+ while (ci.hasNext()) {
+ SelectorCallback cstate = ci.next();
+ // Fake callback - will update as side effect
+ cstate.dataWriteable(this);
+ }
+ readInterest.clear();
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org